Skip to content

Commit

Permalink
Fix logging
Browse files Browse the repository at this point in the history
  • Loading branch information
tolik0 committed Feb 11, 2025
1 parent 3000dca commit 675d829
Showing 1 changed file with 81 additions and 20 deletions.
101 changes: 81 additions & 20 deletions airbyte_cdk/sources/streams/call_rate.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ def __call__(self, request: Any) -> bool:
return False
return True

def __str__(self) -> str:
return (
f"HttpRequestMatcher(method={self._method}, url={self._url}, "
f"params={self._params}, headers={self._headers})"
)


class HttpRequestRegexMatcher(RequestMatcher):
"""
Expand Down Expand Up @@ -255,6 +261,13 @@ def __call__(self, request: Any) -> bool:

return True

def __str__(self) -> str:
regex = self._url_path_pattern.pattern if self._url_path_pattern else None
return (
f"HttpRequestRegexMatcher(method={self._method}, url_base={self._url_base}, "
f"url_path_pattern={regex}, params={self._params}, headers={self._headers})"
)


class BaseCallRatePolicy(AbstractCallRatePolicy, abc.ABC):
def __init__(self, matchers: list[RequestMatcher]):
Expand Down Expand Up @@ -353,6 +366,14 @@ def try_acquire(self, request: Any, weight: int) -> None:

self._calls_num += weight

def __str__(self) -> str:
matcher_str = ", ".join(f"{matcher}" for matcher in self._matchers)
return (
f"FixedWindowCallRatePolicy(call_limit={self._call_limit}, period={self._offset}, "
f"calls_used={self._calls_num}, next_reset={self._next_reset_ts}, "
f"matchers=[{matcher_str}])"
)

def update(
self, available_calls: Optional[int], call_reset_ts: Optional[datetime.datetime]
) -> None:
Expand Down Expand Up @@ -459,6 +480,19 @@ def update(
# if available_calls is not None and call_reset_ts is not None:
# ts = call_reset_ts.timestamp()

def __str__(self) -> str:
"""Return a human-friendly description of the moving window rate policy for logging purposes."""
rates_info = ", ".join(
f"{rate.limit} per {timedelta(milliseconds=rate.interval)}"
for rate in self._bucket.rates
)
current_bucket_count = self._bucket.count()
matcher_str = ", ".join(f"{matcher}" for matcher in self._matchers)
return (
f"MovingWindowCallRatePolicy(rates=[{rates_info}], current_bucket_count={current_bucket_count}, "
f"matchers=[{matcher_str}])"
)


class AbstractAPIBudget(abc.ABC):
"""Interface to some API where a client allowed to have N calls per T interval.
Expand Down Expand Up @@ -511,6 +545,23 @@ def __init__(
self._policies = policies
self._maximum_attempts_to_acquire = maximum_attempts_to_acquire

def _extract_endpoint(self, request: Any) -> str:
"""Extract the endpoint URL from the request if available."""
endpoint = None
try:
# If the request is already a PreparedRequest, it should have a URL.
if isinstance(request, requests.PreparedRequest):
endpoint = request.url
# If it's a requests.Request, we call prepare() to extract the URL.
elif isinstance(request, requests.Request):
prepared = request.prepare()
endpoint = prepared.url
except Exception as e:
logger.debug(f"Error extracting endpoint: {e}")
if endpoint:
return endpoint
return "unknown endpoint"

def get_matching_policy(self, request: Any) -> Optional[AbstractCallRatePolicy]:
for policy in self._policies:
if policy.matches(request):
Expand All @@ -524,20 +575,24 @@ def acquire_call(
Matchers will be called sequentially in the same order they were added.
The first matcher that returns True will
:param request:
:param block: when true (default) will block the current thread until call credit is available
:param timeout: if provided will limit maximum time in block, otherwise will wait until credit is available
:raises: CallRateLimitHit - when no calls left and if timeout was set the waiting time exceed the timeout
:param request: the API request
:param block: when True (default) will block until a call credit is available
:param timeout: if provided, limits maximum waiting time; otherwise, waits indefinitely
:raises: CallRateLimitHit if the call credit cannot be acquired within the timeout
"""

policy = self.get_matching_policy(request)
endpoint = self._extract_endpoint(request)
if policy:
logger.debug(f"Acquiring call for endpoint {endpoint} using policy: {policy}")
self._do_acquire(request=request, policy=policy, block=block, timeout=timeout)
elif self._policies:
logger.info("no policies matched with requests, allow call by default")
logger.debug(
f"No policies matched for endpoint {endpoint} (request: {request}). Allowing call by default."
)

def update_from_response(self, request: Any, response: Any) -> None:
"""Update budget information based on response from API
"""Update budget information based on the API response.
:param request: the initial request that triggered this response
:param response: response from the API
Expand All @@ -547,15 +602,17 @@ def update_from_response(self, request: Any, response: Any) -> None:
def _do_acquire(
self, request: Any, policy: AbstractCallRatePolicy, block: bool, timeout: Optional[float]
) -> None:
"""Internal method to try to acquire a call credit
"""Internal method to try to acquire a call credit.
:param request:
:param policy:
:param block:
:param timeout:
:param request: the API request
:param policy: the matching rate-limiting policy
:param block: indicates whether to block until a call credit is available
:param timeout: maximum time to wait if blocking
:raises: CallRateLimitHit if unable to acquire a call credit
"""
last_exception = None
# sometimes we spend all budget before a second attempt, so we have few more here
endpoint = self._extract_endpoint(request)
# sometimes we spend all budget before a second attempt, so we have a few more attempts
for attempt in range(1, self._maximum_attempts_to_acquire):
try:
policy.try_acquire(request, weight=1)
Expand All @@ -567,20 +624,24 @@ def _do_acquire(
time_to_wait = min(timedelta(seconds=timeout), exc.time_to_wait)
else:
time_to_wait = exc.time_to_wait

time_to_wait = max(
timedelta(0), time_to_wait
) # sometimes we get negative duration
logger.info(
"reached call limit %s. going to sleep for %s", exc.rate, time_to_wait
# Ensure we never sleep for a negative duration.
time_to_wait = max(timedelta(0), time_to_wait)
logger.debug(
f"Policy {policy} reached call limit for endpoint {endpoint} ({exc.rate}). "
f"Sleeping for {time_to_wait} on attempt {attempt}."
)
time.sleep(time_to_wait.total_seconds())
else:
logger.debug(
f"Policy {policy} reached call limit for endpoint {endpoint} ({exc.rate}) "
f"and blocking is disabled."
)
raise

if last_exception:
logger.info(
"we used all %s attempts to acquire and failed", self._maximum_attempts_to_acquire
logger.debug(
f"Exhausted all {self._maximum_attempts_to_acquire} attempts to acquire a call for endpoint {endpoint} "
f"using policy: {policy}"
)
raise last_exception

Expand Down

0 comments on commit 675d829

Please sign in to comment.