Skip to content

Commit

Permalink
feat(low-code): Add API Budget (#314)
Browse files Browse the repository at this point in the history
  • Loading branch information
tolik0 authored Feb 12, 2025
1 parent cb5a921 commit fdcded3
Show file tree
Hide file tree
Showing 9 changed files with 876 additions and 48 deletions.
168 changes: 167 additions & 1 deletion airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ properties:
"$ref": "#/definitions/Spec"
concurrency_level:
"$ref": "#/definitions/ConcurrencyLevel"
api_budget:
"$ref": "#/definitions/HTTPAPIBudget"
metadata:
type: object
description: For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.
Expand Down Expand Up @@ -794,7 +796,7 @@ definitions:
description: This option is used to adjust the upper and lower boundaries of each datetime window to beginning and end of the provided target period (day, week, month)
type: object
required:
- target
- target
properties:
target:
title: Target
Expand Down Expand Up @@ -1365,6 +1367,170 @@ definitions:
$parameters:
type: object
additional_properties: true
HTTPAPIBudget:
title: HTTP API Budget
description: >
Defines how many requests can be made to the API in a given time frame. `HTTPAPIBudget` extracts the remaining
call count and the reset time from HTTP response headers using the header names provided by
`ratelimit_remaining_header` and `ratelimit_reset_header`. Only requests using `HttpRequester`
are rate-limited; custom components that bypass `HttpRequester` are not covered by this budget.
type: object
required:
- type
- policies
properties:
type:
type: string
enum: [HTTPAPIBudget]
policies:
title: Policies
description: List of call rate policies that define how many calls are allowed.
type: array
items:
anyOf:
- "$ref": "#/definitions/FixedWindowCallRatePolicy"
- "$ref": "#/definitions/MovingWindowCallRatePolicy"
- "$ref": "#/definitions/UnlimitedCallRatePolicy"
ratelimit_reset_header:
title: Rate Limit Reset Header
description: The HTTP response header name that indicates when the rate limit resets.
type: string
default: "ratelimit-reset"
ratelimit_remaining_header:
title: Rate Limit Remaining Header
description: The HTTP response header name that indicates the number of remaining allowed calls.
type: string
default: "ratelimit-remaining"
status_codes_for_ratelimit_hit:
title: Status Codes for Rate Limit Hit
description: List of HTTP status codes that indicate a rate limit has been hit.
type: array
items:
type: integer
default: [429]
additionalProperties: true
FixedWindowCallRatePolicy:
title: Fixed Window Call Rate Policy
description: A policy that allows a fixed number of calls within a specific time window.
type: object
required:
- type
- period
- call_limit
- matchers
properties:
type:
type: string
enum: [FixedWindowCallRatePolicy]
period:
title: Period
description: The time interval for the rate limit window.
type: string
call_limit:
title: Call Limit
description: The maximum number of calls allowed within the period.
type: integer
matchers:
title: Matchers
description: List of matchers that define which requests this policy applies to.
type: array
items:
"$ref": "#/definitions/HttpRequestRegexMatcher"
additionalProperties: true
MovingWindowCallRatePolicy:
title: Moving Window Call Rate Policy
description: A policy that allows a fixed number of calls within a moving time window.
type: object
required:
- type
- rates
- matchers
properties:
type:
type: string
enum: [MovingWindowCallRatePolicy]
rates:
title: Rates
description: List of rates that define the call limits for different time intervals.
type: array
items:
"$ref": "#/definitions/Rate"
matchers:
title: Matchers
description: List of matchers that define which requests this policy applies to.
type: array
items:
"$ref": "#/definitions/HttpRequestRegexMatcher"
additionalProperties: true
UnlimitedCallRatePolicy:
title: Unlimited Call Rate Policy
description: A policy that allows unlimited calls for specific requests.
type: object
required:
- type
- matchers
properties:
type:
type: string
enum: [UnlimitedCallRatePolicy]
matchers:
title: Matchers
description: List of matchers that define which requests this policy applies to.
type: array
items:
"$ref": "#/definitions/HttpRequestRegexMatcher"
additionalProperties: true
Rate:
title: Rate
description: Defines a rate limit with a specific number of calls allowed within a time interval.
type: object
required:
- limit
- interval
properties:
limit:
title: Limit
description: The maximum number of calls allowed within the interval.
type: integer
interval:
title: Interval
description: The time interval for the rate limit.
type: string
examples:
- "PT1H"
- "P1D"
additionalProperties: true
HttpRequestRegexMatcher:
title: HTTP Request Matcher
description: >
Matches HTTP requests based on method, base URL, URL path pattern, query parameters, and headers.
Use `url_base` to specify the scheme and host (without trailing slash) and
`url_path_pattern` to apply a regex to the request path.
type: object
properties:
method:
title: Method
description: The HTTP method to match (e.g., GET, POST).
type: string
url_base:
title: URL Base
description: The base URL (scheme and host, e.g. "https://api.example.com") to match.
type: string
url_path_pattern:
title: URL Path Pattern
description: A regular expression pattern to match the URL path.
type: string
params:
title: Parameters
description: The query parameters to match.
type: object
additionalProperties: true
headers:
title: Headers
description: The headers to match.
type: object
additionalProperties: true
additionalProperties: true
DefaultErrorHandler:
title: Default Error Handler
description: Component defining how to handle errors. Default behavior includes only retrying server errors (HTTP 5XX) and too many requests (HTTP 429) with an exponential backoff.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
self._source_config, config
)

api_budget_model = self._source_config.get("api_budget")
if api_budget_model:
self._constructor.set_api_budget(api_budget_model, config)

source_streams = [
self._constructor.create_component(
DeclarativeStreamModel,
Expand Down
126 changes: 126 additions & 0 deletions airbyte_cdk/sources/declarative/models/declarative_component_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,48 @@ class OAuthAuthenticator(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class Rate(BaseModel):
class Config:
extra = Extra.allow

limit: int = Field(
...,
description="The maximum number of calls allowed within the interval.",
title="Limit",
)
interval: str = Field(
...,
description="The time interval for the rate limit.",
examples=["PT1H", "P1D"],
title="Interval",
)


class HttpRequestRegexMatcher(BaseModel):
class Config:
extra = Extra.allow

method: Optional[str] = Field(
None, description="The HTTP method to match (e.g., GET, POST).", title="Method"
)
url_base: Optional[str] = Field(
None,
description='The base URL (scheme and host, e.g. "https://api.example.com") to match.',
title="URL Base",
)
url_path_pattern: Optional[str] = Field(
None,
description="A regular expression pattern to match the URL path.",
title="URL Path Pattern",
)
params: Optional[Dict[str, Any]] = Field(
None, description="The query parameters to match.", title="Parameters"
)
headers: Optional[Dict[str, Any]] = Field(
None, description="The headers to match.", title="Headers"
)


class DpathExtractor(BaseModel):
type: Literal["DpathExtractor"]
field_path: List[str] = Field(
Expand Down Expand Up @@ -1565,6 +1607,55 @@ class DatetimeBasedCursor(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class FixedWindowCallRatePolicy(BaseModel):
class Config:
extra = Extra.allow

type: Literal["FixedWindowCallRatePolicy"]
period: str = Field(
..., description="The time interval for the rate limit window.", title="Period"
)
call_limit: int = Field(
...,
description="The maximum number of calls allowed within the period.",
title="Call Limit",
)
matchers: List[HttpRequestRegexMatcher] = Field(
...,
description="List of matchers that define which requests this policy applies to.",
title="Matchers",
)


class MovingWindowCallRatePolicy(BaseModel):
class Config:
extra = Extra.allow

type: Literal["MovingWindowCallRatePolicy"]
rates: List[Rate] = Field(
...,
description="List of rates that define the call limits for different time intervals.",
title="Rates",
)
matchers: List[HttpRequestRegexMatcher] = Field(
...,
description="List of matchers that define which requests this policy applies to.",
title="Matchers",
)


class UnlimitedCallRatePolicy(BaseModel):
class Config:
extra = Extra.allow

type: Literal["UnlimitedCallRatePolicy"]
matchers: List[HttpRequestRegexMatcher] = Field(
...,
description="List of matchers that define which requests this policy applies to.",
title="Matchers",
)


class DefaultErrorHandler(BaseModel):
type: Literal["DefaultErrorHandler"]
backoff_strategies: Optional[
Expand Down Expand Up @@ -1696,6 +1787,39 @@ class CompositeErrorHandler(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class HTTPAPIBudget(BaseModel):
class Config:
extra = Extra.allow

type: Literal["HTTPAPIBudget"]
policies: List[
Union[
FixedWindowCallRatePolicy,
MovingWindowCallRatePolicy,
UnlimitedCallRatePolicy,
]
] = Field(
...,
description="List of call rate policies that define how many calls are allowed.",
title="Policies",
)
ratelimit_reset_header: Optional[str] = Field(
"ratelimit-reset",
description="The HTTP response header name that indicates when the rate limit resets.",
title="Rate Limit Reset Header",
)
ratelimit_remaining_header: Optional[str] = Field(
"ratelimit-remaining",
description="The HTTP response header name that indicates the number of remaining allowed calls.",
title="Rate Limit Remaining Header",
)
status_codes_for_ratelimit_hit: Optional[List[int]] = Field(
[429],
description="List of HTTP status codes that indicate a rate limit has been hit.",
title="Status Codes for Rate Limit Hit",
)


class ZipfileDecoder(BaseModel):
class Config:
extra = Extra.allow
Expand Down Expand Up @@ -1724,6 +1848,7 @@ class Config:
definitions: Optional[Dict[str, Any]] = None
spec: Optional[Spec] = None
concurrency_level: Optional[ConcurrencyLevel] = None
api_budget: Optional[HTTPAPIBudget] = None
metadata: Optional[Dict[str, Any]] = Field(
None,
description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.",
Expand All @@ -1750,6 +1875,7 @@ class Config:
definitions: Optional[Dict[str, Any]] = None
spec: Optional[Spec] = None
concurrency_level: Optional[ConcurrencyLevel] = None
api_budget: Optional[HTTPAPIBudget] = None
metadata: Optional[Dict[str, Any]] = Field(
None,
description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.",
Expand Down
Loading

0 comments on commit fdcded3

Please sign in to comment.