Skip to content

Commit 5c32297

Browse files
authored
feat(Low-Code CDK Property Chunking): Allow fetching query properties and property chunking for low-code sources (#452)
1 parent caa1e7d commit 5c32297

22 files changed

+2099
-51
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

+118-2
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ definitions:
341341
properties:
342342
type:
343343
type: string
344-
enum: [ DynamicStreamCheckConfig ]
344+
enum: [DynamicStreamCheckConfig]
345345
dynamic_stream_name:
346346
title: Dynamic Stream Name
347347
description: The dynamic stream name.
@@ -1752,6 +1752,30 @@ definitions:
17521752
$parameters:
17531753
type: object
17541754
additionalProperties: true
1755+
GroupByKeyMergeStrategy:
1756+
title: Group by Key
1757+
description: Record merge strategy that combines records according to fields on the record.
1758+
required:
1759+
- type
1760+
- key
1761+
properties:
1762+
type:
1763+
type: string
1764+
enum: [GroupByKeyMergeStrategy]
1765+
key:
1766+
title: Key
1767+
description: The name of the field on the record whose value will be used to group properties that were retrieved through multiple API requests.
1768+
anyOf:
1769+
- type: string
1770+
- type: array
1771+
items:
1772+
type: string
1773+
examples:
1774+
- "id"
1775+
- ["parent_id", "end_date"]
1776+
$parameters:
1777+
type: object
1778+
additionalProperties: true
17551779
SessionTokenAuthenticator:
17561780
type: object
17571781
required:
@@ -1971,7 +1995,9 @@ definitions:
19711995
- type: string
19721996
- type: object
19731997
additionalProperties:
1974-
type: string
1998+
anyOf:
1999+
- type: string
2000+
- $ref": "#/definitions/QueryProperties"
19752001
interpolation_context:
19762002
- next_page_token
19772003
- stream_interval
@@ -2989,6 +3015,96 @@ definitions:
29893015
examples:
29903016
- id
29913017
- ["code", "type"]
3018+
PropertiesFromEndpoint:
3019+
title: Properties from Endpoint
3020+
description: Defines the behavior for fetching the list of properties from an API that will be loaded into the requests to extract records.
3021+
type: object
3022+
required:
3023+
- type
3024+
- property_field_path
3025+
- retriever
3026+
properties:
3027+
type:
3028+
type: string
3029+
enum: [PropertiesFromEndpoint]
3030+
property_field_path:
3031+
description: Describes the path to the field that should be extracted
3032+
type: array
3033+
items:
3034+
type: string
3035+
examples:
3036+
- ["name"]
3037+
interpolation_context:
3038+
- config
3039+
- parameters
3040+
retriever:
3041+
description: Requester component that describes how to fetch the properties to query from a remote API endpoint.
3042+
anyOf:
3043+
- "$ref": "#/definitions/CustomRetriever"
3044+
- "$ref": "#/definitions/SimpleRetriever"
3045+
$parameters:
3046+
type: object
3047+
additionalProperties: true
3048+
PropertyChunking:
3049+
title: Property Chunking
3050+
description: For APIs with restrictions on the amount of properties that can be requester per request, property chunking can be applied to make multiple requests with a subset of the properties.
3051+
type: object
3052+
required:
3053+
- type
3054+
- property_limit_type
3055+
properties:
3056+
type:
3057+
type: string
3058+
enum: [PropertyChunking]
3059+
property_limit_type:
3060+
title: Property Limit Type
3061+
description: The type used to determine the maximum number of properties per chunk
3062+
enum:
3063+
- characters
3064+
- property_count
3065+
property_limit:
3066+
title: Property Limit
3067+
description: The maximum amount of properties that can be retrieved per request according to the limit type.
3068+
type: integer
3069+
record_merge_strategy:
3070+
title: Record Merge Strategy
3071+
description: Dictates how to records that require multiple requests to get all properties should be emitted to the destination
3072+
"$ref": "#/definitions/GroupByKeyMergeStrategy"
3073+
$parameters:
3074+
type: object
3075+
additionalProperties: true
3076+
QueryProperties:
3077+
title: Query Properties
3078+
description: For APIs that require explicit specification of the properties to query for, this component specifies which property fields and how they are supplied to outbound requests.
3079+
type: object
3080+
required:
3081+
- type
3082+
- property_list
3083+
properties:
3084+
type:
3085+
type: string
3086+
enum: [QueryProperties]
3087+
property_list:
3088+
title: Property List
3089+
description: The set of properties that will be queried for in the outbound request. This can either be statically defined or dynamic based on an API endpoint
3090+
anyOf:
3091+
- type: array
3092+
items:
3093+
type: string
3094+
- "$ref": "#/definitions/PropertiesFromEndpoint"
3095+
always_include_properties:
3096+
title: Always Include Properties
3097+
description: The list of properties that should be included in every set of properties when multiple chunks of properties are being requested.
3098+
type: array
3099+
items:
3100+
type: string
3101+
property_chunking:
3102+
title: Property Chunking
3103+
description: Defines how query properties will be grouped into smaller sets for APIs with limitations on the number of properties fetched per API request.
3104+
"$ref": "#/definitions/PropertyChunking"
3105+
$parameters:
3106+
type: object
3107+
additionalProperties: true
29923108
RecordFilter:
29933109
title: Record Filter
29943110
description: Filter applied on a list of records.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

+75-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
13
# generated by datamodel-codegen:
24
# filename: declarative_component_schema.yaml
35

@@ -49,7 +51,7 @@ class DynamicStreamCheckConfig(BaseModel):
4951
)
5052
stream_count: Optional[int] = Field(
5153
0,
52-
description="Numbers of the streams to try reading from when running a check operation.",
54+
description="The number of streams to attempt reading from during a check operation. If `stream_count` exceeds the total number of available streams, the minimum of the two values will be used.",
5355
title="Stream Count",
5456
)
5557

@@ -718,6 +720,17 @@ class ExponentialBackoffStrategy(BaseModel):
718720
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
719721

720722

723+
class GroupByKeyMergeStrategy(BaseModel):
724+
type: Literal["GroupByKeyMergeStrategy"]
725+
key: Union[str, List[str]] = Field(
726+
...,
727+
description="The name of the field on the record whose value will be used to group properties that were retrieved through multiple API requests.",
728+
examples=["id", ["parent_id", "end_date"]],
729+
title="Key",
730+
)
731+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
732+
733+
721734
class SessionTokenRequestBearerAuthenticator(BaseModel):
722735
type: Literal["Bearer"]
723736

@@ -1189,6 +1202,31 @@ class PrimaryKey(BaseModel):
11891202
)
11901203

11911204

1205+
class PropertyLimitType(Enum):
1206+
characters = "characters"
1207+
property_count = "property_count"
1208+
1209+
1210+
class PropertyChunking(BaseModel):
1211+
type: Literal["PropertyChunking"]
1212+
property_limit_type: PropertyLimitType = Field(
1213+
...,
1214+
description="The type used to determine the maximum number of properties per chunk",
1215+
title="Property Limit Type",
1216+
)
1217+
property_limit: Optional[int] = Field(
1218+
None,
1219+
description="The maximum amount of properties that can be retrieved per request according to the limit type.",
1220+
title="Property Limit",
1221+
)
1222+
record_merge_strategy: Optional[GroupByKeyMergeStrategy] = Field(
1223+
None,
1224+
description="Dictates how to records that require multiple requests to get all properties should be emitted to the destination",
1225+
title="Record Merge Strategy",
1226+
)
1227+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
1228+
1229+
11921230
class RecordFilter(BaseModel):
11931231
type: Literal["RecordFilter"]
11941232
condition: Optional[str] = Field(
@@ -2187,7 +2225,7 @@ class HttpRequester(BaseModel):
21872225
examples=[{"Output-Format": "JSON"}, {"Version": "{{ config['version'] }}"}],
21882226
title="Request Headers",
21892227
)
2190-
request_parameters: Optional[Union[str, Dict[str, str]]] = Field(
2228+
request_parameters: Optional[Union[str, Dict[str, Union[str, Any]]]] = Field(
21912229
None,
21922230
description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.",
21932231
examples=[
@@ -2277,6 +2315,40 @@ class ParentStreamConfig(BaseModel):
22772315
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
22782316

22792317

2318+
class PropertiesFromEndpoint(BaseModel):
2319+
type: Literal["PropertiesFromEndpoint"]
2320+
property_field_path: List[str] = Field(
2321+
...,
2322+
description="Describes the path to the field that should be extracted",
2323+
examples=[["name"]],
2324+
)
2325+
retriever: Union[CustomRetriever, SimpleRetriever] = Field(
2326+
...,
2327+
description="Requester component that describes how to fetch the properties to query from a remote API endpoint.",
2328+
)
2329+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
2330+
2331+
2332+
class QueryProperties(BaseModel):
2333+
type: Literal["QueryProperties"]
2334+
property_list: Union[List[str], PropertiesFromEndpoint] = Field(
2335+
...,
2336+
description="The set of properties that will be queried for in the outbound request. This can either be statically defined or dynamic based on an API endpoint",
2337+
title="Property List",
2338+
)
2339+
always_include_properties: Optional[List[str]] = Field(
2340+
None,
2341+
description="The list of properties that should be included in every set of properties when multiple chunks of properties are being requested.",
2342+
title="Always Include Properties",
2343+
)
2344+
property_chunking: Optional[PropertyChunking] = Field(
2345+
None,
2346+
description="Defines how query properties will be grouped into smaller sets for APIs with limitations on the number of properties fetched per API request.",
2347+
title="Property Chunking",
2348+
)
2349+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
2350+
2351+
22802352
class StateDelegatingStream(BaseModel):
22812353
type: Literal["StateDelegatingStream"]
22822354
name: str = Field(..., description="The stream name.", example=["Users"], title="Name")
@@ -2525,5 +2597,6 @@ class DynamicDeclarativeStream(BaseModel):
25252597
SessionTokenAuthenticator.update_forward_refs()
25262598
DynamicSchemaLoader.update_forward_refs()
25272599
ParentStreamConfig.update_forward_refs()
2600+
PropertiesFromEndpoint.update_forward_refs()
25282601
SimpleRetriever.update_forward_refs()
25292602
AsyncRetriever.update_forward_refs()

0 commit comments

Comments
 (0)