Skip to content

feat(low code): Add GroupingPartitionRouter #354

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2894,7 +2894,7 @@ definitions:
title: Lazy Read Pointer
description: If set, this will enable lazy reading, using the initial read of parent records to extract child records.
type: array
default: [ ]
default: []
items:
- type: string
interpolation_context:
Expand Down Expand Up @@ -3199,7 +3199,7 @@ definitions:
properties:
type:
type: string
enum: [ StateDelegatingStream ]
enum: [StateDelegatingStream]
name:
title: Name
description: The stream name.
Expand Down Expand Up @@ -3254,12 +3254,14 @@ definitions:
- "$ref": "#/definitions/CustomPartitionRouter"
- "$ref": "#/definitions/ListPartitionRouter"
- "$ref": "#/definitions/SubstreamPartitionRouter"
- "$ref": "#/definitions/GroupingPartitionRouter"
- type: array
items:
anyOf:
- "$ref": "#/definitions/CustomPartitionRouter"
- "$ref": "#/definitions/ListPartitionRouter"
- "$ref": "#/definitions/SubstreamPartitionRouter"
- "$ref": "#/definitions/GroupingPartitionRouter"
decoder:
title: Decoder
description: Component decoding the response so records can be extracted.
Expand Down Expand Up @@ -3414,12 +3416,14 @@ definitions:
- "$ref": "#/definitions/CustomPartitionRouter"
- "$ref": "#/definitions/ListPartitionRouter"
- "$ref": "#/definitions/SubstreamPartitionRouter"
- "$ref": "#/definitions/GroupingPartitionRouter"
- type: array
items:
anyOf:
- "$ref": "#/definitions/CustomPartitionRouter"
- "$ref": "#/definitions/ListPartitionRouter"
- "$ref": "#/definitions/SubstreamPartitionRouter"
- "$ref": "#/definitions/GroupingPartitionRouter"
decoder:
title: Decoder
description: Component decoding the response so records can be extracted.
Expand Down Expand Up @@ -3536,6 +3540,44 @@ definitions:
$parameters:
type: object
additionalProperties: true
GroupingPartitionRouter:
title: Grouping Partition Router
description: >
A decorator on top of a partition router that groups partitions into batches of a specified size.
This is useful for APIs that support filtering by multiple partition keys in a single request.
Note that per-partition incremental syncs may not work as expected because the grouping
of partitions might change between syncs, potentially leading to inconsistent state tracking.
type: object
required:
- type
- group_size
- underlying_partition_router
properties:
type:
type: string
enum: [GroupingPartitionRouter]
group_size:
title: Group Size
description: The number of partitions to include in each group. This determines how many partition values are batched together in a single slice.
type: integer
examples:
- 10
- 50
underlying_partition_router:
title: Underlying Partition Router
description: The partition router whose output will be grouped. This can be any valid partition router component.
anyOf:
- "$ref": "#/definitions/CustomPartitionRouter"
- "$ref": "#/definitions/ListPartitionRouter"
- "$ref": "#/definitions/SubstreamPartitionRouter"
deduplicate:
title: Deduplicate Partitions
description: If true, ensures that partitions are unique within each group by removing duplicates based on the partition key.
type: boolean
default: true
$parameters:
type: object
additionalProperties: true
WaitUntilTimeFromHeader:
title: Wait Until Time Defined In Response Header
description: Extract time at which we can retry the request from response header and wait for the difference between now and that time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def __init__(
connector_state_manager: ConnectorStateManager,
connector_state_converter: AbstractStreamStateConverter,
cursor_field: CursorField,
use_global_cursor: bool = False,
) -> None:
self._global_cursor: Optional[StreamState] = {}
self._stream_name = stream_name
Expand Down Expand Up @@ -106,7 +107,7 @@ def __init__(
self._lookback_window: int = 0
self._parent_state: Optional[StreamState] = None
self._number_of_partitions: int = 0
self._use_global_cursor: bool = False
self._use_global_cursor: bool = use_global_cursor
self._partition_serializer = PerPartitionKeySerializer()
# Track the last time a state message was emitted
self._last_emission_time: float = 0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2301,7 +2301,15 @@ class SimpleRetriever(BaseModel):
CustomPartitionRouter,
ListPartitionRouter,
SubstreamPartitionRouter,
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
GroupingPartitionRouter,
List[
Union[
CustomPartitionRouter,
ListPartitionRouter,
SubstreamPartitionRouter,
GroupingPartitionRouter,
]
],
]
] = Field(
[],
Expand Down Expand Up @@ -2379,7 +2387,15 @@ class AsyncRetriever(BaseModel):
CustomPartitionRouter,
ListPartitionRouter,
SubstreamPartitionRouter,
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
GroupingPartitionRouter,
List[
Union[
CustomPartitionRouter,
ListPartitionRouter,
SubstreamPartitionRouter,
GroupingPartitionRouter,
]
],
]
] = Field(
[],
Expand Down Expand Up @@ -2431,6 +2447,29 @@ class SubstreamPartitionRouter(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class GroupingPartitionRouter(BaseModel):
type: Literal["GroupingPartitionRouter"]
group_size: int = Field(
...,
description="The number of partitions to include in each group. This determines how many partition values are batched together in a single slice.",
examples=[10, 50],
title="Group Size",
)
underlying_partition_router: Union[
CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter
] = Field(
...,
description="The partition router whose output will be grouped. This can be any valid partition router component.",
title="Underlying Partition Router",
)
deduplicate: Optional[bool] = Field(
True,
description="If true, ensures that partitions are unique within each group by removing duplicates based on the partition key.",
title="Deduplicate Partitions",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class HttpComponentsResolver(BaseModel):
type: Literal["HttpComponentsResolver"]
retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
FlattenFields as FlattenFieldsModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
GroupingPartitionRouter as GroupingPartitionRouterModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
GzipDecoder as GzipDecoderModel,
)
Expand Down Expand Up @@ -385,6 +388,7 @@
)
from airbyte_cdk.sources.declarative.partition_routers import (
CartesianProductStreamSlicer,
GroupingPartitionRouter,
ListPartitionRouter,
PartitionRouter,
SinglePartitionRouter,
Expand Down Expand Up @@ -638,6 +642,7 @@ def _init_mappings(self) -> None:
UnlimitedCallRatePolicyModel: self.create_unlimited_call_rate_policy,
RateModel: self.create_rate,
HttpRequestRegexMatcherModel: self.create_http_request_matcher,
GroupingPartitionRouterModel: self.create_grouping_partition_router,
}

# Needed for the case where we need to perform a second parse on the fields of a custom component
Expand Down Expand Up @@ -1355,6 +1360,9 @@ def create_concurrent_cursor_from_perpartition_cursor(
)
stream_state = self.apply_stream_state_migrations(stream_state_migrations, stream_state)

# Per-partition state doesn't make sense for GroupingPartitionRouter, so force the global state
use_global_cursor = isinstance(partition_router, GroupingPartitionRouter)

# Return the concurrent cursor and state converter
return ConcurrentPerPartitionCursor(
cursor_factory=cursor_factory,
Expand All @@ -1366,6 +1374,7 @@ def create_concurrent_cursor_from_perpartition_cursor(
connector_state_manager=state_manager,
connector_state_converter=connector_state_converter,
cursor_field=cursor_field,
use_global_cursor=use_global_cursor,
)

@staticmethod
Expand Down Expand Up @@ -3344,3 +3353,34 @@ def set_api_budget(self, component_definition: ComponentDefinition, config: Conf
self._api_budget = self.create_component(
model_type=HTTPAPIBudgetModel, component_definition=component_definition, config=config
)

def create_grouping_partition_router(
self, model: GroupingPartitionRouterModel, config: Config, **kwargs: Any
) -> GroupingPartitionRouter:
underlying_router = self._create_component_from_model(
model=model.underlying_partition_router, config=config
)
if model.group_size < 1:
raise ValueError(f"Group size must be greater than 0, got {model.group_size}")

# Request options in underlying partition routers are not supported for GroupingPartitionRouter
# because they are specific to individual partitions and cannot be aggregated or handled
# when grouping, potentially leading to incorrect API calls. Any request customization
# should be managed at the stream level through the requester's configuration.
if isinstance(underlying_router, SubstreamPartitionRouter):
if any(
parent_config.request_option
for parent_config in underlying_router.parent_stream_configs
):
raise ValueError("Request options are not supported for GroupingPartitionRouter.")

if isinstance(underlying_router, ListPartitionRouter):
if underlying_router.request_option:
raise ValueError("Request options are not supported for GroupingPartitionRouter.")

return GroupingPartitionRouter(
group_size=model.group_size,
underlying_partition_router=underlying_router,
deduplicate=model.deduplicate if model.deduplicate is not None else True,
config=config,
)
4 changes: 4 additions & 0 deletions airbyte_cdk/sources/declarative/partition_routers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from airbyte_cdk.sources.declarative.partition_routers.cartesian_product_stream_slicer import (
CartesianProductStreamSlicer,
)
from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import (
GroupingPartitionRouter,
)
from airbyte_cdk.sources.declarative.partition_routers.list_partition_router import (
ListPartitionRouter,
)
Expand All @@ -22,6 +25,7 @@
__all__ = [
"AsyncJobPartitionRouter",
"CartesianProductStreamSlicer",
"GroupingPartitionRouter",
"ListPartitionRouter",
"SinglePartitionRouter",
"SubstreamPartitionRouter",
Expand Down
Loading
Loading