Skip to content

Commit

Permalink
Add FixedWindowCallRatePolicy unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
tolik0 committed Feb 9, 2025
1 parent 1285668 commit 7be9842
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -3001,7 +3001,7 @@ def create_fixed_window_call_rate_policy(
]
return FixedWindowCallRatePolicy(
next_reset_ts=model.next_reset_ts,
period=parse_duration(model.period),
period=model.period,
call_limit=model.call_limit,
matchers=matchers,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3644,3 +3644,82 @@ def test_api_budget():
# The 0.1s from 'PT0.1S' is stored in ms by PyRateLimiter internally
# but here just check that the limit and interval exist
assert policy._bucket.rates[0].interval == 100 # 100 ms


def test_api_budget_fixed_window_policy():
manifest = {
"type": "DeclarativeSource",
# Root-level api_budget referencing a FixedWindowCallRatePolicy
"api_budget": {
"type": "APIBudget",
"maximum_attempts_to_acquire": 9999,
"policies": [
{
"type": "FixedWindowCallRatePolicy",
"next_reset_ts": "2025-01-01T00:00:00Z",
"period": "PT1M", # 1 minute
"call_limit": 10,
"matchers": [
{
"type": "HttpRequestRegexMatcher",
"method": "GET",
"url_base": "https://example.org",
"url_path_pattern": "/v2/data",
}
],
}
],
},
# We'll define a single HttpRequester that references that base
"my_requester": {
"type": "HttpRequester",
"path": "/v2/data",
"url_base": "https://example.org",
"http_method": "GET",
"authenticator": {"type": "NoAuth"},
},
}

config = {}

factory = ModelToComponentFactory()
if "api_budget" in manifest:
factory.set_api_budget(manifest["api_budget"], config)

from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
HttpRequester as HttpRequesterModel,
)

requester_definition = manifest["my_requester"]
assert requester_definition["type"] == "HttpRequester"
http_requester = factory.create_component(
model_type=HttpRequesterModel,
component_definition=requester_definition,
config=config,
name="my_stream",
decoder=None,
)

assert http_requester.api_budget is not None
assert http_requester.api_budget.maximum_attempts_to_acquire == 9999
assert len(http_requester.api_budget.policies) == 1

from airbyte_cdk.sources.streams.call_rate import FixedWindowCallRatePolicy

policy = http_requester.api_budget.policies[0]
assert isinstance(policy, FixedWindowCallRatePolicy)
assert policy._call_limit == 10
# The period is "PT1M" => 60 seconds
assert policy._offset.total_seconds() == 60

expected_reset_dt = datetime(2025, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
assert policy._next_reset_ts == expected_reset_dt

assert len(policy._matchers) == 1
matcher = policy._matchers[0]
from airbyte_cdk.sources.streams.call_rate import HttpRequestRegexMatcher

assert isinstance(matcher, HttpRequestRegexMatcher)
assert matcher._method == "GET"
assert matcher._url_base == "https://example.org"
assert matcher._url_path_pattern.pattern == "/v2/data"

0 comments on commit 7be9842

Please sign in to comment.