From 96ee457f821c8b280d9baa0f00206d1438d31bb1 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Thu, 20 Feb 2025 14:45:54 +0200 Subject: [PATCH 01/15] Add GroupingPartitionRouter --- .../declarative_component_schema.yaml | 43 +++++++ .../models/declarative_component_schema.py | 45 ++++++- .../parsers/model_to_component_factory.py | 24 ++++ .../declarative/partition_routers/__init__.py | 4 + .../grouping_partition_router.py | 116 ++++++++++++++++++ 5 files changed, 229 insertions(+), 3 deletions(-) create mode 100644 airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 7bf5c4fa3..9a2c6d848 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -3130,12 +3130,14 @@ definitions: - "$ref": "#/definitions/CustomPartitionRouter" - "$ref": "#/definitions/ListPartitionRouter" - "$ref": "#/definitions/SubstreamPartitionRouter" + - "$ref": "#/definitions/GroupingPartitionRouter" - type: array items: anyOf: - "$ref": "#/definitions/CustomPartitionRouter" - "$ref": "#/definitions/ListPartitionRouter" - "$ref": "#/definitions/SubstreamPartitionRouter" + - "$ref": "#/definitions/GroupingPartitionRouter" decoder: title: Decoder description: Component decoding the response so records can be extracted. @@ -3290,12 +3292,14 @@ definitions: - "$ref": "#/definitions/CustomPartitionRouter" - "$ref": "#/definitions/ListPartitionRouter" - "$ref": "#/definitions/SubstreamPartitionRouter" + - "$ref": "#/definitions/GroupingPartitionRouter" - type: array items: anyOf: - "$ref": "#/definitions/CustomPartitionRouter" - "$ref": "#/definitions/ListPartitionRouter" - "$ref": "#/definitions/SubstreamPartitionRouter" + - "$ref": "#/definitions/GroupingPartitionRouter" decoder: title: Decoder description: Component decoding the response so records can be extracted. @@ -3412,6 +3416,45 @@ definitions: $parameters: type: object additionalProperties: true + GroupingPartitionRouter: + title: Grouping Partition Router + description: > + A decorator on top of a partition router that groups partitions into batches of a specified size. + This is useful for APIs that support filtering by multiple partition keys in a single request. + Note that per-partition incremental syncs may not work as expected because the grouping + of partitions might change between syncs, potentially leading to inconsistent state tracking. + type: object + required: + - type + - group_size + - underlying_partition_router + properties: + type: + type: string + enum: [GroupingPartitionRouter] + group_size: + title: Group Size + description: The number of partitions to include in each group. This determines how many partition values are batched together in a single slice. + type: integer + minimum: 1 + examples: + - 10 + - 50 + partition_router: + title: Underlying Partition Router + description: The partition router whose output will be grouped. This can be any valid partition router component. + anyOf: + - "$ref": "#/definitions/CustomPartitionRouter" + - "$ref": "#/definitions/ListPartitionRouter" + - "$ref": "#/definitions/SubstreamPartitionRouter" + deduplicate: + title: Deduplicate Partitions + description: If true, ensures that partitions are unique within each group by removing duplicates based on the partition key. + type: boolean + default: true + $parameters: + type: object + additionalProperties: true WaitUntilTimeFromHeader: title: Wait Until Time Defined In Response Header description: Extract time at which we can retry the request from response header and wait for the difference between now and that time. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 8f5be6867..63667aada 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -6,7 +6,7 @@ from enum import Enum from typing import Any, Dict, List, Literal, Optional, Union -from pydantic.v1 import BaseModel, Extra, Field +from pydantic.v1 import BaseModel, Extra, Field, conint class AuthFlowType(Enum): @@ -2225,7 +2225,15 @@ class SimpleRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], + GroupingPartitionRouter, + List[ + Union[ + CustomPartitionRouter, + ListPartitionRouter, + SubstreamPartitionRouter, + GroupingPartitionRouter, + ] + ], ] ] = Field( [], @@ -2303,7 +2311,15 @@ class AsyncRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], + GroupingPartitionRouter, + List[ + Union[ + CustomPartitionRouter, + ListPartitionRouter, + SubstreamPartitionRouter, + GroupingPartitionRouter, + ] + ], ] ] = Field( [], @@ -2355,6 +2371,29 @@ class SubstreamPartitionRouter(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class GroupingPartitionRouter(BaseModel): + type: Literal["GroupingPartitionRouter"] + group_size: conint(ge=1) = Field( + ..., + description="The number of partitions to include in each group. This determines how many partition values are batched together in a single slice.", + examples=[10, 50], + title="Group Size", + ) + partition_router: Optional[ + Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter] + ] = Field( + None, + description="The partition router whose output will be grouped. This can be any valid partition router component.", + title="Underlying Partition Router", + ) + deduplicate: Optional[bool] = Field( + True, + description="If true, ensures that partitions are unique within each group by removing duplicates based on the partition key.", + title="Deduplicate Partitions", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class HttpComponentsResolver(BaseModel): type: Literal["HttpComponentsResolver"] retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field( diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 739d15825..2da2d1d70 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -227,6 +227,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( FlattenFields as FlattenFieldsModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + GroupingPartitionRouter as GroupingPartitionRouterModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( GzipDecoder as GzipDecoderModel, ) @@ -379,6 +382,7 @@ ) from airbyte_cdk.sources.declarative.partition_routers import ( CartesianProductStreamSlicer, + GroupingPartitionRouter, ListPartitionRouter, PartitionRouter, SinglePartitionRouter, @@ -3044,3 +3048,23 @@ def set_api_budget(self, component_definition: ComponentDefinition, config: Conf self._api_budget = self.create_component( model_type=HTTPAPIBudgetModel, component_definition=component_definition, config=config ) + + def create_grouping_partition_router( + self, model: GroupingPartitionRouterModel, config: Config, **kwargs: Any + ) -> GroupingPartitionRouter: + underlying_router = self._create_component_from_model( + model=model.partition_router, config=config + ) + + if not isinstance(underlying_router, PartitionRouter): + raise ValueError( + f"Underlying partition router must be a PartitionRouter subclass, got {type(underlying_router)}" + ) + + return GroupingPartitionRouter( + group_size=model.group_size, + underlying_partition_router=underlying_router, + deduplicate=model.deduplicate if model.deduplicate is not None else True, + parameters=model.parameters or {}, + config=config, + ) diff --git a/airbyte_cdk/sources/declarative/partition_routers/__init__.py b/airbyte_cdk/sources/declarative/partition_routers/__init__.py index f35647402..2e99286d2 100644 --- a/airbyte_cdk/sources/declarative/partition_routers/__init__.py +++ b/airbyte_cdk/sources/declarative/partition_routers/__init__.py @@ -8,6 +8,9 @@ from airbyte_cdk.sources.declarative.partition_routers.cartesian_product_stream_slicer import ( CartesianProductStreamSlicer, ) +from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import ( + GroupingPartitionRouter, +) from airbyte_cdk.sources.declarative.partition_routers.list_partition_router import ( ListPartitionRouter, ) @@ -22,6 +25,7 @@ __all__ = [ "AsyncJobPartitionRouter", "CartesianProductStreamSlicer", + "GroupingPartitionRouter", "ListPartitionRouter", "SinglePartitionRouter", "SubstreamPartitionRouter", diff --git a/airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py b/airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py new file mode 100644 index 000000000..db7875709 --- /dev/null +++ b/airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py @@ -0,0 +1,116 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from dataclasses import dataclass +from typing import Any, Iterable, Mapping, Optional + +from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter +from airbyte_cdk.sources.types import Config, StreamSlice, StreamState + + +@dataclass +class GroupingPartitionRouter(PartitionRouter): + """ + A partition router that groups partitions from an underlying partition router into batches of a specified size. + This is useful for APIs that support filtering by multiple partition keys in a single request. + + Attributes: + group_size (int): The number of partitions to include in each group. + underlying_partition_router (SinglePartitionRouter): The partition router whose output will be grouped. + deduplicate (bool): If True, ensures unique partitions within each group by removing duplicates based on the partition key. + config (Config): The connector configuration. + parameters (Mapping[str, Any]): Additional parameters for interpolation and configuration. + """ + + group_size: int + underlying_partition_router: PartitionRouter + config: Config + deduplicate: bool = True + + def stream_slices(self) -> Iterable[StreamSlice]: + """ + Lazily groups partitions from the underlying partition router into batches of size `group_size`. + + This method processes partitions one at a time from the underlying router, maintaining a batch buffer. + When the buffer reaches `group_size` or the underlying router is exhausted, it yields a grouped slice. + If deduplication is enabled, it tracks seen partition keys to ensure uniqueness within the current batch. + + Yields: + Iterable[StreamSlice]: An iterable of StreamSlice objects, where each slice contains a batch of partition values. + """ + batch = [] + seen_keys = set() if self.deduplicate else None + + # Iterate over partitions lazily from the underlying router + for partition in self.underlying_partition_router.stream_slices(): + # Extract the partition key (assuming single key-value pair, e.g., {"board_ids": value}) + key = next(iter(partition.partition.values()), None) + + # Skip duplicates if deduplication is enabled + if self.deduplicate and key in seen_keys: + continue + + # Add partition to the batch + batch.append(partition) + if self.deduplicate: + seen_keys.add(key) + + # Yield the batch when it reaches the group_size + if len(batch) == self.group_size: + yield self._create_grouped_slice(batch) + batch = [] # Reset the batch + + # Yield any remaining partitions if the batch isn't empty + if batch: + yield self._create_grouped_slice(batch) + + def _create_grouped_slice(self, batch: list[StreamSlice]) -> StreamSlice: + # Combine partition values into a single dict with lists + grouped_partition = { + key: [p.partition.get(key) for p in batch] for key in batch[0].partition.keys() + } + return StreamSlice( + partition=grouped_partition, + cursor_slice={}, # Cursor is managed by the underlying router or incremental sync + ) + + def get_request_params( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + return {} + + def get_request_headers( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + return {} + + def get_request_body_data( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + return {} + + def get_request_body_json( + self, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + next_page_token: Optional[Mapping[str, Any]] = None, + ) -> Mapping[str, Any]: + return {} + + def set_initial_state(self, stream_state: StreamState) -> None: + """Delegate state initialization to the underlying partition router.""" + self.underlying_partition_router.set_initial_state(stream_state) + + def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: + """Delegate state retrieval to the underlying partition router.""" + return self.underlying_partition_router.get_stream_state() From cbf6328425c472b5cc3beccd21c41f50bbb07cea Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Fri, 21 Feb 2025 17:36:47 +0200 Subject: [PATCH 02/15] Fix parameter name --- .../declarative_component_schema.yaml | 2 +- .../models/declarative_component_schema.py | 84 +++++++++++-------- .../parsers/model_to_component_factory.py | 2 +- 3 files changed, 53 insertions(+), 35 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 9a2c6d848..2aecc676a 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -3440,7 +3440,7 @@ definitions: examples: - 10 - 50 - partition_router: + underlying_partition_router: title: Underlying Partition Router description: The partition router whose output will be grouped. This can be any valid partition router component. anyOf: diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 63667aada..4bc661612 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -609,7 +609,9 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], + examples=[ + ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] + ], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -1078,24 +1080,28 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( + Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", + ) ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( + Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", + ) ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -1113,7 +1119,9 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], + examples=[ + {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} + ], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1744,7 +1752,9 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( + schema_normalization: Optional[ + Union[SchemaNormalization, CustomSchemaNormalization] + ] = Field( SchemaNormalization.None_, description="Responsible for normalization according to the schema.", title="Schema Normalization", @@ -1948,12 +1958,16 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", + incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = ( + Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", + ) + ) + name: Optional[str] = Field( + "", description="The stream name.", example=["Users"], title="Name" ) - name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) @@ -2277,7 +2291,9 @@ class AsyncRetriever(BaseModel): ) download_extractor: Optional[ Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor] - ] = Field(None, description="Responsible for fetching the records from provided urls.") + ] = Field( + None, description="Responsible for fetching the records from provided urls." + ) creation_requester: Union[CustomRequester, HttpRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", @@ -2379,10 +2395,10 @@ class GroupingPartitionRouter(BaseModel): examples=[10, 50], title="Group Size", ) - partition_router: Optional[ - Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter] + underlying_partition_router: Union[ + CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter ] = Field( - None, + ..., description="The partition router whose output will be grouped. This can be any valid partition router component.", title="Underlying Partition Router", ) @@ -2410,10 +2426,12 @@ class DynamicDeclarativeStream(BaseModel): stream_template: DeclarativeStream = Field( ..., description="Reference to the stream template.", title="Stream Template" ) - components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field( - ..., - description="Component resolve and populates stream templates with components values.", - title="Components Resolver", + components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = ( + Field( + ..., + description="Component resolve and populates stream templates with components values.", + title="Components Resolver", + ) ) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 2da2d1d70..125fff1e2 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -3053,7 +3053,7 @@ def create_grouping_partition_router( self, model: GroupingPartitionRouterModel, config: Config, **kwargs: Any ) -> GroupingPartitionRouter: underlying_router = self._create_component_from_model( - model=model.partition_router, config=config + model=model.underlying_partition_router, config=config ) if not isinstance(underlying_router, PartitionRouter): From 26455ca13c4e5bd642a275e57d4d55fcc6a69c53 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Mon, 24 Feb 2025 17:44:36 +0200 Subject: [PATCH 03/15] Add extra fields --- .../grouping_partition_router.py | 22 ++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py b/airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py index db7875709..bea875559 100644 --- a/airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py +++ b/airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py @@ -17,7 +17,7 @@ class GroupingPartitionRouter(PartitionRouter): Attributes: group_size (int): The number of partitions to include in each group. - underlying_partition_router (SinglePartitionRouter): The partition router whose output will be grouped. + underlying_partition_router (PartitionRouter): The partition router whose output will be grouped. deduplicate (bool): If True, ensures unique partitions within each group by removing duplicates based on the partition key. config (Config): The connector configuration. parameters (Mapping[str, Any]): Additional parameters for interpolation and configuration. @@ -66,13 +66,33 @@ def stream_slices(self) -> Iterable[StreamSlice]: yield self._create_grouped_slice(batch) def _create_grouped_slice(self, batch: list[StreamSlice]) -> StreamSlice: + """ + Creates a grouped StreamSlice from a batch of partitions, aggregating extra fields into a dictionary with list values. + + Args: + batch (list[StreamSlice]): A list of StreamSlice objects to group. + + Returns: + StreamSlice: A single StreamSlice with combined partition and extra field values. + """ # Combine partition values into a single dict with lists grouped_partition = { key: [p.partition.get(key) for p in batch] for key in batch[0].partition.keys() } + + # Aggregate extra fields into a dict with list values + extra_fields_dict = ( + { + key: [p.extra_fields.get(key) for p in batch] + for key in set().union(*(p.extra_fields.keys() for p in batch if p.extra_fields)) + } + if any(p.extra_fields for p in batch) + else {} + ) return StreamSlice( partition=grouped_partition, cursor_slice={}, # Cursor is managed by the underlying router or incremental sync + extra_fields=extra_fields_dict, ) def get_request_params( From b35a1658671fd9a723a970e57478eace1693bd89 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Mon, 24 Feb 2025 17:44:45 +0200 Subject: [PATCH 04/15] Add unit tests --- .../test_grouping_partition_router.py | 460 ++++++++++++++++++ 1 file changed, 460 insertions(+) create mode 100644 unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py diff --git a/unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py b/unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py new file mode 100644 index 000000000..0340f1c9f --- /dev/null +++ b/unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py @@ -0,0 +1,460 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from typing import Any, Iterable, List, Mapping, Optional, Union +from unittest.mock import MagicMock + +import pytest +from test_substream_partition_router import ( + MockStream, + parent_slices, +) # Reuse MockStream and parent_slices + +from airbyte_cdk.sources.declarative.partition_routers import ( + GroupingPartitionRouter, + SubstreamPartitionRouter, +) +from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ( + ParentStreamConfig, +) +from airbyte_cdk.sources.types import StreamSlice + + +@pytest.fixture +def mock_config(): + return {} + + +@pytest.fixture +def mock_underlying_router(mock_config): + """Fixture for a simple underlying router with predefined slices and extra fields.""" + parent_stream = MockStream( + slices=[{}], # Single empty slice, parent_partition will be {} + records=[ + {"board_id": 0, "name": "Board 0", "owner": "User0"}, + { + "board_id": 0, + "name": "Board 0 Duplicate", + "owner": "User0 Duplicate", + }, # Duplicate board_id + ] + + [{"board_id": i, "name": f"Board {i}", "owner": f"User{i}"} for i in range(1, 5)], + name="mock_parent", + ) + return SubstreamPartitionRouter( + parent_stream_configs=[ + ParentStreamConfig( + stream=parent_stream, + parent_key="board_id", + partition_field="board_ids", + config=mock_config, + parameters={}, + extra_fields=[["name"], ["owner"]], + ) + ], + config=mock_config, + parameters={}, + ) + + +@pytest.fixture +def mock_underlying_router_with_parent_slices(mock_config): + """Fixture with varied parent slices for testing non-empty parent_slice.""" + parent_stream = MockStream( + slices=parent_slices, # [{"slice": "first"}, {"slice": "second"}, {"slice": "third"}] + records=[ + {"board_id": 1, "name": "Board 1", "owner": "User1", "slice": "first"}, + {"board_id": 2, "name": "Board 2", "owner": "User2", "slice": "second"}, + {"board_id": 3, "name": "Board 3", "owner": "User3", "slice": "third"}, + ], + name="mock_parent", + ) + return SubstreamPartitionRouter( + parent_stream_configs=[ + ParentStreamConfig( + stream=parent_stream, + parent_key="board_id", + partition_field="board_ids", + config=mock_config, + parameters={}, + extra_fields=[["name"], ["owner"]], + ) + ], + config=mock_config, + parameters={}, + ) + + +@pytest.mark.parametrize( + "group_size, deduplicate, expected_slices", + [ + ( + 2, + True, + [ + StreamSlice( + partition={"board_ids": [0, 1], "parent_slice": [{}, {}]}, + cursor_slice={}, + extra_fields={"name": ["Board 0", "Board 1"], "owner": ["User0", "User1"]}, + ), + StreamSlice( + partition={"board_ids": [2, 3], "parent_slice": [{}, {}]}, + cursor_slice={}, + extra_fields={"name": ["Board 2", "Board 3"], "owner": ["User2", "User3"]}, + ), + StreamSlice( + partition={"board_ids": [4], "parent_slice": [{}]}, + cursor_slice={}, + extra_fields={"name": ["Board 4"], "owner": ["User4"]}, + ), + ], + ), + ( + 3, + True, + [ + StreamSlice( + partition={"board_ids": [0, 1, 2], "parent_slice": [{}, {}, {}]}, + cursor_slice={}, + extra_fields={ + "name": ["Board 0", "Board 1", "Board 2"], + "owner": ["User0", "User1", "User2"], + }, + ), + StreamSlice( + partition={"board_ids": [3, 4], "parent_slice": [{}, {}]}, + cursor_slice={}, + extra_fields={"name": ["Board 3", "Board 4"], "owner": ["User3", "User4"]}, + ), + ], + ), + ( + 2, + False, + [ + StreamSlice( + partition={"board_ids": [0, 0], "parent_slice": [{}, {}]}, + cursor_slice={}, + extra_fields={ + "name": ["Board 0", "Board 0 Duplicate"], + "owner": ["User0", "User0 Duplicate"], + }, + ), + StreamSlice( + partition={"board_ids": [1, 2], "parent_slice": [{}, {}]}, + cursor_slice={}, + extra_fields={"name": ["Board 1", "Board 2"], "owner": ["User1", "User2"]}, + ), + StreamSlice( + partition={"board_ids": [3, 4], "parent_slice": [{}, {}]}, + cursor_slice={}, + extra_fields={"name": ["Board 3", "Board 4"], "owner": ["User3", "User4"]}, + ), + ], + ), + ], + ids=["group_size_2_deduplicate", "group_size_3_deduplicate", "group_size_2_no_deduplicate"], +) +def test_stream_slices_grouping( + mock_config, mock_underlying_router, group_size, deduplicate, expected_slices +): + """Test basic grouping behavior with different group sizes and deduplication settings.""" + router = GroupingPartitionRouter( + group_size=group_size, + underlying_partition_router=mock_underlying_router, + deduplicate=deduplicate, + config=mock_config, + ) + slices = list(router.stream_slices()) + assert slices == expected_slices + + +def test_stream_slices_empty_underlying_router(mock_config): + """Test behavior when the underlying router yields no slices.""" + parent_stream = MockStream( + slices=[{}], + records=[], + name="mock_parent", + ) + underlying_router = SubstreamPartitionRouter( + parent_stream_configs=[ + ParentStreamConfig( + stream=parent_stream, + parent_key="board_id", + partition_field="board_ids", + config=mock_config, + parameters={}, + extra_fields=[["name"]], + ) + ], + config=mock_config, + parameters={}, + ) + router = GroupingPartitionRouter( + group_size=2, + underlying_partition_router=underlying_router, + config=mock_config, + ) + slices = list(router.stream_slices()) + assert slices == [] + + +def test_stream_slices_lazy_iteration(mock_config, mock_underlying_router): + """Test that stream_slices processes partitions lazily, iterating the underlying router as an iterator.""" + + # Custom iterator to track yields and simulate underlying stream_slices + class ControlledIterator: + def __init__(self): + self.slices = [ + StreamSlice( + partition={"board_ids": 0}, + cursor_slice={}, + extra_fields={"name": "Board 0", "owner": "User0"}, + ), + StreamSlice( + partition={"board_ids": 1}, + cursor_slice={}, + extra_fields={"name": "Board 1", "owner": "User1"}, + ), + StreamSlice( + partition={"board_ids": 2}, + cursor_slice={}, + extra_fields={"name": "Board 2", "owner": "User2"}, + ), + StreamSlice( + partition={"board_ids": 3}, + cursor_slice={}, + extra_fields={"name": "Board 3", "owner": "User3"}, + ), + StreamSlice( + partition={"board_ids": 4}, + cursor_slice={}, + extra_fields={"name": "Board 4", "owner": "User4"}, + ), + ] + self.index = 0 + self.yield_count = 0 + + def __iter__(self): + return self + + def __next__(self): + if self.index < len(self.slices): + self.yield_count += 1 + slice = self.slices[self.index] + self.index += 1 + return slice + raise StopIteration + + # Replace the underlying router's stream_slices with the controlled iterator + controlled_iter = ControlledIterator() + mock_underlying_router.stream_slices = MagicMock(return_value=controlled_iter) + + router = GroupingPartitionRouter( + group_size=2, + underlying_partition_router=mock_underlying_router, + config=mock_config, + deduplicate=True, + ) + slices_iter = router.stream_slices() + + # Before iteration, no slices should be yielded + assert controlled_iter.yield_count == 0, "No slices should be yielded before iteration starts" + + # Get the first slice + first_slice = next(slices_iter) + assert first_slice == StreamSlice( + partition={"board_ids": [0, 1]}, + cursor_slice={}, + extra_fields={"name": ["Board 0", "Board 1"], "owner": ["User0", "User1"]}, + ) + assert ( + controlled_iter.yield_count == 2 + ), "Only 2 slices should be yielded to form the first group" + + # Get the second slice + second_slice = next(slices_iter) + assert second_slice == StreamSlice( + partition={"board_ids": [2, 3]}, + cursor_slice={}, + extra_fields={"name": ["Board 2", "Board 3"], "owner": ["User2", "User3"]}, + ) + assert ( + controlled_iter.yield_count == 4 + ), "Only 4 slices should be yielded up to the second group" + + # Exhaust the iterator + remaining_slices = list(slices_iter) + assert remaining_slices == [ + StreamSlice( + partition={"board_ids": [4]}, + cursor_slice={}, + extra_fields={"name": ["Board 4"], "owner": ["User4"]}, + ) + ] + assert ( + controlled_iter.yield_count == 5 + ), "All 5 slices should be yielded after exhausting the iterator" + + +def test_set_initial_state_delegation(mock_config, mock_underlying_router): + """Test that set_initial_state delegates to the underlying router.""" + router = GroupingPartitionRouter( + group_size=2, + underlying_partition_router=mock_underlying_router, + config=mock_config, + ) + mock_state = {"some_key": "some_value"} + mock_underlying_router.set_initial_state = MagicMock() + + router.set_initial_state(mock_state) + mock_underlying_router.set_initial_state.assert_called_once_with(mock_state) + + +def test_get_stream_state_delegation(mock_config, mock_underlying_router): + """Test that get_stream_state delegates to the underlying router.""" + router = GroupingPartitionRouter( + group_size=2, + underlying_partition_router=mock_underlying_router, + config=mock_config, + ) + mock_state = {"state_key": "state_value"} + mock_underlying_router.get_stream_state = MagicMock(return_value=mock_state) + + state = router.get_stream_state() + assert state == mock_state + mock_underlying_router.get_stream_state.assert_called_once() + + +def test_stream_slices_extra_fields_varied(mock_config): + """Test grouping with varied extra fields across partitions.""" + parent_stream = MockStream( + slices=[{}], + records=[ + {"board_id": 1, "name": "Board 1", "owner": "User1"}, + {"board_id": 2, "name": "Board 2"}, # Missing owner + {"board_id": 3, "owner": "User3"}, # Missing name + ], + name="mock_parent", + ) + underlying_router = SubstreamPartitionRouter( + parent_stream_configs=[ + ParentStreamConfig( + stream=parent_stream, + parent_key="board_id", + partition_field="board_ids", + config=mock_config, + parameters={}, + extra_fields=[["name"], ["owner"]], + ) + ], + config=mock_config, + parameters={}, + ) + router = GroupingPartitionRouter( + group_size=2, + underlying_partition_router=underlying_router, + config=mock_config, + deduplicate=True, + ) + expected_slices = [ + StreamSlice( + partition={"board_ids": [1, 2], "parent_slice": [{}, {}]}, + cursor_slice={}, + extra_fields={"name": ["Board 1", "Board 2"], "owner": ["User1", None]}, + ), + StreamSlice( + partition={"board_ids": [3], "parent_slice": [{}]}, + cursor_slice={}, + extra_fields={"name": [None], "owner": ["User3"]}, + ), + ] + slices = list(router.stream_slices()) + assert slices == expected_slices + + +def test_grouping_with_complex_partitions_and_extra_fields(mock_config): + """Test grouping with partitions containing multiple keys and extra fields.""" + parent_stream = MockStream( + slices=[{}], + records=[{"board_id": i, "extra": f"extra_{i}", "name": f"Board {i}"} for i in range(3)], + name="mock_parent", + ) + underlying_router = SubstreamPartitionRouter( + parent_stream_configs=[ + ParentStreamConfig( + stream=parent_stream, + parent_key="board_id", + partition_field="board_ids", + config=mock_config, + parameters={}, + extra_fields=[["extra"], ["name"]], + ) + ], + config=mock_config, + parameters={}, + ) + router = GroupingPartitionRouter( + group_size=2, + underlying_partition_router=underlying_router, + config=mock_config, + ) + expected_slices = [ + StreamSlice( + partition={"board_ids": [0, 1], "parent_slice": [{}, {}]}, + cursor_slice={}, + extra_fields={"extra": ["extra_0", "extra_1"], "name": ["Board 0", "Board 1"]}, + ), + StreamSlice( + partition={"board_ids": [2], "parent_slice": [{}]}, + cursor_slice={}, + extra_fields={"extra": ["extra_2"], "name": ["Board 2"]}, + ), + ] + slices = list(router.stream_slices()) + assert slices == expected_slices + + +def test_stream_slices_with_non_empty_parent_slice( + mock_config, mock_underlying_router_with_parent_slices +): + """Test grouping with non-empty parent_slice values from the underlying router.""" + router = GroupingPartitionRouter( + group_size=2, + underlying_partition_router=mock_underlying_router_with_parent_slices, + config=mock_config, + deduplicate=True, + ) + expected_slices = [ + StreamSlice( + partition={ + "board_ids": [1, 2], + "parent_slice": [{"slice": "first"}, {"slice": "second"}], + }, + cursor_slice={}, + extra_fields={"name": ["Board 1", "Board 2"], "owner": ["User1", "User2"]}, + ), + StreamSlice( + partition={"board_ids": [3], "parent_slice": [{"slice": "third"}]}, + cursor_slice={}, + extra_fields={"name": ["Board 3"], "owner": ["User3"]}, + ), + ] + slices = list(router.stream_slices()) + assert slices == expected_slices + + +def test_get_request_params_default(mock_config, mock_underlying_router): + """Test that get_request_params returns an empty dict by default.""" + router = GroupingPartitionRouter( + group_size=2, + underlying_partition_router=mock_underlying_router, + config=mock_config, + ) + params = router.get_request_params( + stream_slice=StreamSlice( + partition={"board_ids": [1, 2], "parent_slice": [{}, {}]}, cursor_slice={} + ) + ) + assert params == {} From 2348d255ad9e789104f2d95cd4a6aab88918400a Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Mon, 24 Feb 2025 18:24:12 +0200 Subject: [PATCH 05/15] Fix import in unit tests --- .../partition_routers/test_grouping_partition_router.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py b/unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py index 0340f1c9f..34a862bf6 100644 --- a/unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py +++ b/unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py @@ -6,7 +6,7 @@ from unittest.mock import MagicMock import pytest -from test_substream_partition_router import ( +from unit_tests.sources.declarative.partition_routers.test_substream_partition_router import ( MockStream, parent_slices, ) # Reuse MockStream and parent_slices From cf782a2af3461ced2b8fd03453710e1bdcd74da0 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Mon, 24 Feb 2025 18:24:47 +0200 Subject: [PATCH 06/15] Fix formatting --- .../models/declarative_component_schema.py | 78 +++++++------------ 1 file changed, 30 insertions(+), 48 deletions(-) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 4bc661612..d65292d0c 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -609,9 +609,7 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[ - ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] - ], + examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -1080,28 +1078,24 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( - Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", - ) + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( - Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", - ) + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -1119,9 +1113,7 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[ - {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} - ], + examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1752,9 +1744,7 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[ - Union[SchemaNormalization, CustomSchemaNormalization] - ] = Field( + schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( SchemaNormalization.None_, description="Responsible for normalization according to the schema.", title="Schema Normalization", @@ -1958,16 +1948,12 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = ( - Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", - ) - ) - name: Optional[str] = Field( - "", description="The stream name.", example=["Users"], title="Name" + incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", ) + name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) @@ -2291,9 +2277,7 @@ class AsyncRetriever(BaseModel): ) download_extractor: Optional[ Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor] - ] = Field( - None, description="Responsible for fetching the records from provided urls." - ) + ] = Field(None, description="Responsible for fetching the records from provided urls.") creation_requester: Union[CustomRequester, HttpRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", @@ -2426,12 +2410,10 @@ class DynamicDeclarativeStream(BaseModel): stream_template: DeclarativeStream = Field( ..., description="Reference to the stream template.", title="Stream Template" ) - components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = ( - Field( - ..., - description="Component resolve and populates stream templates with components values.", - title="Components Resolver", - ) + components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field( + ..., + description="Component resolve and populates stream templates with components values.", + title="Components Resolver", ) From 4d8c918a979d0786335d90617aa4579dabcbb2c2 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Mon, 24 Feb 2025 18:40:04 +0200 Subject: [PATCH 07/15] Add unit test for model_to_component_factory.py --- .../parsers/model_to_component_factory.py | 2 +- .../test_model_to_component_factory.py | 60 +++++++++++++++++++ .../test_grouping_partition_router.py | 8 +-- 3 files changed, 65 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 125fff1e2..b22ecdbc5 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -628,6 +628,7 @@ def _init_mappings(self) -> None: UnlimitedCallRatePolicyModel: self.create_unlimited_call_rate_policy, RateModel: self.create_rate, HttpRequestRegexMatcherModel: self.create_http_request_matcher, + GroupingPartitionRouterModel: self.create_grouping_partition_router, } # Needed for the case where we need to perform a second parse on the fields of a custom component @@ -3065,6 +3066,5 @@ def create_grouping_partition_router( group_size=model.group_size, underlying_partition_router=underlying_router, deduplicate=model.deduplicate if model.deduplicate is not None else True, - parameters=model.parameters or {}, config=config, ) diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index faab999cb..34d3b27f3 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -65,6 +65,9 @@ from airbyte_cdk.sources.declarative.models import DatetimeBasedCursor as DatetimeBasedCursorModel from airbyte_cdk.sources.declarative.models import DeclarativeStream as DeclarativeStreamModel from airbyte_cdk.sources.declarative.models import DefaultPaginator as DefaultPaginatorModel +from airbyte_cdk.sources.declarative.models import ( + GroupingPartitionRouter as GroupingPartitionRouterModel, +) from airbyte_cdk.sources.declarative.models import HttpRequester as HttpRequesterModel from airbyte_cdk.sources.declarative.models import JwtAuthenticator as JwtAuthenticatorModel from airbyte_cdk.sources.declarative.models import ListPartitionRouter as ListPartitionRouterModel @@ -96,6 +99,7 @@ from airbyte_cdk.sources.declarative.partition_routers import ( AsyncJobPartitionRouter, CartesianProductStreamSlicer, + GroupingPartitionRouter, ListPartitionRouter, SinglePartitionRouter, SubstreamPartitionRouter, @@ -3840,3 +3844,59 @@ def test_api_budget_fixed_window_policy(): assert matcher._method == "GET" assert matcher._url_base == "https://example.org" assert matcher._url_path_pattern.pattern == "/v2/data" + + +def test_create_grouping_partition_router_with_underlying_router(): + content = """ + schema_loader: + file_path: "./source_example/schemas/{{ parameters['name'] }}.yaml" + name: "{{ parameters['stream_name'] }}" + retriever: + requester: + type: "HttpRequester" + path: "example" + record_selector: + extractor: + field_path: [] + stream_A: + type: DeclarativeStream + name: "A" + primary_key: "id" + $parameters: + retriever: "#/retriever" + url_base: "https://airbyte.io" + schema_loader: "#/schema_loader" + sub_partition_router: + type: SubstreamPartitionRouter + parent_stream_configs: + - stream: "#/stream_A" + parent_key: id + partition_field: repository_id + partition_router: + type: GroupingPartitionRouter + underlying_partition_router: "#/sub_partition_router" + group_size: 2 + """ + parsed_manifest = YamlDeclarativeSource._parse(content) + resolved_manifest = resolver.preprocess_manifest(parsed_manifest) + partition_router_manifest = transformer.propagate_types_and_parameters( + "", resolved_manifest["partition_router"], {} + ) + + partition_router = factory.create_component( + model_type=GroupingPartitionRouterModel, + component_definition=partition_router_manifest, + config=input_config, + ) + + # Test the created partition router + assert isinstance(partition_router, GroupingPartitionRouter) + assert isinstance(partition_router.underlying_partition_router, SubstreamPartitionRouter) + assert partition_router.group_size == 2 + + # Test the underlying partition router + parent_stream_configs = partition_router.underlying_partition_router.parent_stream_configs + assert len(parent_stream_configs) == 1 + assert isinstance(parent_stream_configs[0].stream, DeclarativeStream) + assert parent_stream_configs[0].parent_key.eval({}) == "id" + assert parent_stream_configs[0].partition_field.eval({}) == "repository_id" diff --git a/unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py b/unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py index 34a862bf6..1cc53d300 100644 --- a/unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py +++ b/unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py @@ -6,10 +6,6 @@ from unittest.mock import MagicMock import pytest -from unit_tests.sources.declarative.partition_routers.test_substream_partition_router import ( - MockStream, - parent_slices, -) # Reuse MockStream and parent_slices from airbyte_cdk.sources.declarative.partition_routers import ( GroupingPartitionRouter, @@ -19,6 +15,10 @@ ParentStreamConfig, ) from airbyte_cdk.sources.types import StreamSlice +from unit_tests.sources.declarative.partition_routers.test_substream_partition_router import ( + MockStream, + parent_slices, +) # Reuse MockStream and parent_slices @pytest.fixture From 6cb895e99131ccfe0ea29384fdde0d91862796d0 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Mon, 24 Feb 2025 19:42:04 +0200 Subject: [PATCH 08/15] Fix mypy errors --- .../sources/declarative/declarative_component_schema.yaml | 1 - .../declarative/models/declarative_component_schema.py | 4 ++-- .../sources/declarative/parsers/model_to_component_factory.py | 2 ++ .../partition_routers/grouping_partition_router.py | 2 +- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 2aecc676a..805b3a83a 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -3436,7 +3436,6 @@ definitions: title: Group Size description: The number of partitions to include in each group. This determines how many partition values are batched together in a single slice. type: integer - minimum: 1 examples: - 10 - 50 diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index d65292d0c..8fa7ed354 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -6,7 +6,7 @@ from enum import Enum from typing import Any, Dict, List, Literal, Optional, Union -from pydantic.v1 import BaseModel, Extra, Field, conint +from pydantic.v1 import BaseModel, Extra, Field class AuthFlowType(Enum): @@ -2373,7 +2373,7 @@ class SubstreamPartitionRouter(BaseModel): class GroupingPartitionRouter(BaseModel): type: Literal["GroupingPartitionRouter"] - group_size: conint(ge=1) = Field( + group_size: int = Field( ..., description="The number of partitions to include in each group. This determines how many partition values are batched together in a single slice.", examples=[10, 50], diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index b22ecdbc5..d16ad7d07 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -3056,6 +3056,8 @@ def create_grouping_partition_router( underlying_router = self._create_component_from_model( model=model.underlying_partition_router, config=config ) + if model.group_size < 1: + raise ValueError(f"Group size must be greater than 0, got {model.group_size}") if not isinstance(underlying_router, PartitionRouter): raise ValueError( diff --git a/airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py b/airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py index bea875559..f2206add7 100644 --- a/airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py +++ b/airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py @@ -40,7 +40,7 @@ def stream_slices(self) -> Iterable[StreamSlice]: Iterable[StreamSlice]: An iterable of StreamSlice objects, where each slice contains a batch of partition values. """ batch = [] - seen_keys = set() if self.deduplicate else None + seen_keys = set() # Iterate over partitions lazily from the underlying router for partition in self.underlying_partition_router.stream_slices(): From fd9b2255a1621f4aa6ebd9df407cd475805f0df7 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Mon, 24 Feb 2025 19:53:35 +0200 Subject: [PATCH 09/15] Add unit tests for error scenarios --- .../parsers/model_to_component_factory.py | 7 ++ .../test_model_to_component_factory.py | 97 +++++++++++++++++++ 2 files changed, 104 insertions(+) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index d16ad7d07..fcda82b87 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -3064,6 +3064,13 @@ def create_grouping_partition_router( f"Underlying partition router must be a PartitionRouter subclass, got {type(underlying_router)}" ) + if isinstance(underlying_router, SubstreamPartitionRouter): + if any( + parent_config.request_option + for parent_config in underlying_router.parent_stream_configs + ): + raise ValueError("Request options are not supported for GroupingPartitionRouter.") + return GroupingPartitionRouter( group_size=model.group_size, underlying_partition_router=underlying_router, diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 34d3b27f3..0ec34b6d9 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -3900,3 +3900,100 @@ def test_create_grouping_partition_router_with_underlying_router(): assert isinstance(parent_stream_configs[0].stream, DeclarativeStream) assert parent_stream_configs[0].parent_key.eval({}) == "id" assert parent_stream_configs[0].partition_field.eval({}) == "repository_id" + + +def test_create_grouping_partition_router_invalid_group_size(): + """Test that an invalid group_size (< 1) raises a ValueError.""" + content = """ + schema_loader: + file_path: "./source_example/schemas/{{ parameters['name'] }}.yaml" + name: "{{ parameters['stream_name'] }}" + retriever: + requester: + type: "HttpRequester" + path: "example" + record_selector: + extractor: + field_path: [] + stream_A: + type: DeclarativeStream + name: "A" + primary_key: "id" + $parameters: + retriever: "#/retriever" + url_base: "https://airbyte.io" + schema_loader: "#/schema_loader" + sub_partition_router: + type: SubstreamPartitionRouter + parent_stream_configs: + - stream: "#/stream_A" + parent_key: id + partition_field: repository_id + partition_router: + type: GroupingPartitionRouter + underlying_partition_router: "#/sub_partition_router" + group_size: 0 + """ + parsed_manifest = YamlDeclarativeSource._parse(content) + resolved_manifest = resolver.preprocess_manifest(parsed_manifest) + partition_router_manifest = transformer.propagate_types_and_parameters( + "", resolved_manifest["partition_router"], {} + ) + + with pytest.raises(ValueError, match="Group size must be greater than 0, got 0"): + factory.create_component( + model_type=GroupingPartitionRouterModel, + component_definition=partition_router_manifest, + config=input_config, + ) + + +def test_create_grouping_partition_router_substream_with_request_option(): + """Test that a SubstreamPartitionRouter with request_option raises a ValueError.""" + content = """ + schema_loader: + file_path: "./source_example/schemas/{{ parameters['name'] }}.yaml" + name: "{{ parameters['stream_name'] }}" + retriever: + requester: + type: "HttpRequester" + path: "example" + record_selector: + extractor: + field_path: [] + stream_A: + type: DeclarativeStream + name: "A" + primary_key: "id" + $parameters: + retriever: "#/retriever" + url_base: "https://airbyte.io" + schema_loader: "#/schema_loader" + sub_partition_router: + type: SubstreamPartitionRouter + parent_stream_configs: + - stream: "#/stream_A" + parent_key: id + partition_field: repository_id + request_option: + inject_into: request_parameter + field_name: "repo_id" + partition_router: + type: GroupingPartitionRouter + underlying_partition_router: "#/sub_partition_router" + group_size: 2 + """ + parsed_manifest = YamlDeclarativeSource._parse(content) + resolved_manifest = resolver.preprocess_manifest(parsed_manifest) + partition_router_manifest = transformer.propagate_types_and_parameters( + "", resolved_manifest["partition_router"], {} + ) + + with pytest.raises( + ValueError, match="Request options are not supported for GroupingPartitionRouter." + ): + factory.create_component( + model_type=GroupingPartitionRouterModel, + component_definition=partition_router_manifest, + config=input_config, + ) From 01fb6c8ede4f9e70877bb626beefaeb97036bcd0 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Wed, 26 Feb 2025 16:13:22 +0200 Subject: [PATCH 10/15] Fix comments from review --- .../parsers/model_to_component_factory.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index fcda82b87..474b15c09 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -3059,11 +3059,10 @@ def create_grouping_partition_router( if model.group_size < 1: raise ValueError(f"Group size must be greater than 0, got {model.group_size}") - if not isinstance(underlying_router, PartitionRouter): - raise ValueError( - f"Underlying partition router must be a PartitionRouter subclass, got {type(underlying_router)}" - ) - + # Request options in underlying partition routers are not supported for GroupingPartitionRouter + # because they are specific to individual partitions and cannot be aggregated or handled + # when grouping, potentially leading to incorrect API calls. Any request customization + # should be managed at the stream level or through the retriever's configuration. if isinstance(underlying_router, SubstreamPartitionRouter): if any( parent_config.request_option @@ -3071,6 +3070,10 @@ def create_grouping_partition_router( ): raise ValueError("Request options are not supported for GroupingPartitionRouter.") + if isinstance(underlying_router, ListPartitionRouter): + if underlying_router.request_option: + raise ValueError("Request options are not supported for GroupingPartitionRouter.") + return GroupingPartitionRouter( group_size=model.group_size, underlying_partition_router=underlying_router, From 1a1d407c79a9f406fd8a89647f0f0292248ca704 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Wed, 26 Feb 2025 16:14:10 +0200 Subject: [PATCH 11/15] Fix comment --- .../sources/declarative/parsers/model_to_component_factory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 474b15c09..7365e3f96 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -3062,7 +3062,7 @@ def create_grouping_partition_router( # Request options in underlying partition routers are not supported for GroupingPartitionRouter # because they are specific to individual partitions and cannot be aggregated or handled # when grouping, potentially leading to incorrect API calls. Any request customization - # should be managed at the stream level or through the retriever's configuration. + # should be managed at the stream level through the requester's configuration. if isinstance(underlying_router, SubstreamPartitionRouter): if any( parent_config.request_option From e7bb9c04259741d2064a795a7bb5fa8c37cc4ac3 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Tue, 18 Mar 2025 13:31:39 +0200 Subject: [PATCH 12/15] Add correct state handling --- .../partition_routers/grouping_partition_router.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py b/airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py index f2206add7..e2f419f4d 100644 --- a/airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py +++ b/airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py @@ -28,6 +28,9 @@ class GroupingPartitionRouter(PartitionRouter): config: Config deduplicate: bool = True + def __post_init__(self) -> None: + self._state: Optional[Mapping[str, StreamState]] = {} + def stream_slices(self) -> Iterable[StreamSlice]: """ Lazily groups partitions from the underlying partition router into batches of size `group_size`. @@ -58,9 +61,11 @@ def stream_slices(self) -> Iterable[StreamSlice]: # Yield the batch when it reaches the group_size if len(batch) == self.group_size: + self._state = self.underlying_partition_router.get_stream_state() yield self._create_grouped_slice(batch) batch = [] # Reset the batch + self._state = self.underlying_partition_router.get_stream_state() # Yield any remaining partitions if the batch isn't empty if batch: yield self._create_grouped_slice(batch) @@ -130,7 +135,8 @@ def get_request_body_json( def set_initial_state(self, stream_state: StreamState) -> None: """Delegate state initialization to the underlying partition router.""" self.underlying_partition_router.set_initial_state(stream_state) + self._state = self.underlying_partition_router.get_stream_state() def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: """Delegate state retrieval to the underlying partition router.""" - return self.underlying_partition_router.get_stream_state() + return self._state From 85d041b365504eaf692a13a4f99d7f88a8a3fbba Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Tue, 18 Mar 2025 13:32:15 +0200 Subject: [PATCH 13/15] Force global state for GroupingPartitionRouter --- .../declarative_component_schema.yaml | 4 +- .../concurrent_partition_cursor.py | 3 +- .../parsers/model_to_component_factory.py | 4 + .../test_grouping_partition_router.py | 84 +++++++++++++++---- 4 files changed, 77 insertions(+), 18 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 69a6b00f6..2a1b23242 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2894,7 +2894,7 @@ definitions: title: Lazy Read Pointer description: If set, this will enable lazy reading, using the initial read of parent records to extract child records. type: array - default: [ ] + default: [] items: - type: string interpolation_context: @@ -3199,7 +3199,7 @@ definitions: properties: type: type: string - enum: [ StateDelegatingStream ] + enum: [StateDelegatingStream] name: title: Name description: The stream name. diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index 715589026..a0c541dc4 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -79,6 +79,7 @@ def __init__( connector_state_manager: ConnectorStateManager, connector_state_converter: AbstractStreamStateConverter, cursor_field: CursorField, + use_global_cursor: bool = False, ) -> None: self._global_cursor: Optional[StreamState] = {} self._stream_name = stream_name @@ -106,7 +107,7 @@ def __init__( self._lookback_window: int = 0 self._parent_state: Optional[StreamState] = None self._number_of_partitions: int = 0 - self._use_global_cursor: bool = False + self._use_global_cursor: bool = use_global_cursor self._partition_serializer = PerPartitionKeySerializer() # Track the last time a state message was emitted self._last_emission_time: float = 0.0 diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 13aa4dfa4..7dd8f50b9 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1360,6 +1360,9 @@ def create_concurrent_cursor_from_perpartition_cursor( ) stream_state = self.apply_stream_state_migrations(stream_state_migrations, stream_state) + # Per-partition state doesn't make sense for GroupingPartitionRouter, so force the global state + use_global_cursor = isinstance(partition_router, GroupingPartitionRouter) + # Return the concurrent cursor and state converter return ConcurrentPerPartitionCursor( cursor_factory=cursor_factory, @@ -1371,6 +1374,7 @@ def create_concurrent_cursor_from_perpartition_cursor( connector_state_manager=state_manager, connector_state_converter=connector_state_converter, cursor_field=cursor_field, + use_global_cursor=use_global_cursor, ) @staticmethod diff --git a/unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py b/unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py index 1cc53d300..a75a48966 100644 --- a/unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py +++ b/unit_tests/sources/declarative/partition_routers/test_grouping_partition_router.py @@ -312,21 +312,6 @@ def test_set_initial_state_delegation(mock_config, mock_underlying_router): mock_underlying_router.set_initial_state.assert_called_once_with(mock_state) -def test_get_stream_state_delegation(mock_config, mock_underlying_router): - """Test that get_stream_state delegates to the underlying router.""" - router = GroupingPartitionRouter( - group_size=2, - underlying_partition_router=mock_underlying_router, - config=mock_config, - ) - mock_state = {"state_key": "state_value"} - mock_underlying_router.get_stream_state = MagicMock(return_value=mock_state) - - state = router.get_stream_state() - assert state == mock_state - mock_underlying_router.get_stream_state.assert_called_once() - - def test_stream_slices_extra_fields_varied(mock_config): """Test grouping with varied extra fields across partitions.""" parent_stream = MockStream( @@ -458,3 +443,72 @@ def test_get_request_params_default(mock_config, mock_underlying_router): ) ) assert params == {} + + +def test_stream_slices_resume_from_state(mock_config, mock_underlying_router): + """Test that stream_slices resumes correctly from a previous state.""" + + # Simulate underlying router state handling + class MockPartitionRouter: + def __init__(self): + self.slices = [ + StreamSlice( + partition={"board_ids": i}, + cursor_slice={}, + extra_fields={"name": f"Board {i}", "owner": f"User{i}"}, + ) + for i in range(5) + ] + self.state = {"last_board_id": 0} # Initial state + + def set_initial_state(self, state): + self.state = state + + def get_stream_state(self): + return self.state + + def stream_slices(self): + last_board_id = self.state.get("last_board_id", -1) + for slice in self.slices: + board_id = slice.partition["board_ids"] + if board_id <= last_board_id: + continue + self.state = {"last_board_id": board_id} + yield slice + + underlying_router = MockPartitionRouter() + router = GroupingPartitionRouter( + group_size=2, + underlying_partition_router=underlying_router, + config=mock_config, + deduplicate=True, + ) + + # First sync: process first two slices + router.set_initial_state({"last_board_id": 0}) + slices_iter = router.stream_slices() + first_batch = next(slices_iter) + assert first_batch == StreamSlice( + partition={"board_ids": [1, 2]}, + cursor_slice={}, + extra_fields={"name": ["Board 1", "Board 2"], "owner": ["User1", "User2"]}, + ) + state_after_first = router.get_stream_state() + assert state_after_first == {"last_board_id": 2}, "State should reflect last processed board_id" + + # Simulate a new sync resuming from the previous state + new_router = GroupingPartitionRouter( + group_size=2, + underlying_partition_router=MockPartitionRouter(), + config=mock_config, + deduplicate=True, + ) + new_router.set_initial_state(state_after_first) + resumed_slices = list(new_router.stream_slices()) + assert resumed_slices == [ + StreamSlice( + partition={"board_ids": [3, 4]}, + cursor_slice={}, + extra_fields={"name": ["Board 3", "Board 4"], "owner": ["User3", "User4"]}, + ) + ], "Should resume from board_id 3" From 93cf934fe4bd938097c6ab71f31885597f375f45 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Tue, 18 Mar 2025 16:08:49 +0200 Subject: [PATCH 14/15] Add error for partition with multiple values --- .../partition_routers/grouping_partition_router.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py b/airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py index e2f419f4d..d6e73ba34 100644 --- a/airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py +++ b/airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py @@ -48,6 +48,10 @@ def stream_slices(self) -> Iterable[StreamSlice]: # Iterate over partitions lazily from the underlying router for partition in self.underlying_partition_router.stream_slices(): # Extract the partition key (assuming single key-value pair, e.g., {"board_ids": value}) + if len(partition.partition.values()) != 1: + raise ValueError( + f"GroupingPartitionRouter expects a single partition key-value pair. Got {partition.partition}" + ) key = next(iter(partition.partition.values()), None) # Skip duplicates if deduplication is enabled From e152d70b2938a08ed6f3deaee55932615075f471 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Tue, 18 Mar 2025 16:19:06 +0200 Subject: [PATCH 15/15] Fix error for partition with multiple values --- .../partition_routers/grouping_partition_router.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py b/airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py index d6e73ba34..a08acbbea 100644 --- a/airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py +++ b/airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py @@ -48,11 +48,15 @@ def stream_slices(self) -> Iterable[StreamSlice]: # Iterate over partitions lazily from the underlying router for partition in self.underlying_partition_router.stream_slices(): # Extract the partition key (assuming single key-value pair, e.g., {"board_ids": value}) - if len(partition.partition.values()) != 1: + partition_keys = list(partition.partition.keys()) + # skip parent_slice as it is part of SubstreamPartitionRouter partition + if "parent_slice" in partition_keys: + partition_keys.remove("parent_slice") + if len(partition_keys) != 1: raise ValueError( f"GroupingPartitionRouter expects a single partition key-value pair. Got {partition.partition}" ) - key = next(iter(partition.partition.values()), None) + key = partition.partition[partition_keys[0]] # Skip duplicates if deduplication is enabled if self.deduplicate and key in seen_keys: