Skip to content

Commit a5f46eb

Browse files
committed
telemetry circuit breaker
Signed-off-by: Sai Shree Pradhan <[email protected]>
1 parent 59d28b0 commit a5f46eb

File tree

4 files changed

+126
-3
lines changed

4 files changed

+126
-3
lines changed

poetry.lock

Lines changed: 15 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ oauthlib = "^3.1.0"
2121
openpyxl = "^3.0.10"
2222
urllib3 = ">=1.26"
2323
python-dateutil = "^2.8.0"
24+
pybreaker = "^0.6.0"
2425
pyarrow = [
2526
{ version = ">=14.0.1", python = ">=3.8,<3.13", optional=true },
2627
{ version = ">=18.0.0", python = ">=3.13", optional=true }

src/databricks/sql/common/http.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import logging
1010
from requests.adapters import HTTPAdapter
1111
from databricks.sql.auth.retry import DatabricksRetryPolicy, CommandType
12+
from pybreaker import CircuitBreaker, CircuitBreakerError
1213

1314
logger = logging.getLogger(__name__)
1415

@@ -109,6 +110,9 @@ class TelemetryHttpClient: # TODO: Unify all the http clients in the PySQL Conn
109110
TELEMETRY_RETRY_DELAY_MAX = 10.0
110111
TELEMETRY_RETRY_STOP_AFTER_ATTEMPTS_DURATION = 30.0
111112

113+
CIRCUIT_BREAKER_FAIL_MAX = 5
114+
CIRCUIT_BREAKER_RESET_TIMEOUT = 60
115+
112116
def __init__(self):
113117
"""Initializes the session and mounts the custom retry adapter."""
114118
retry_policy = DatabricksRetryPolicy(
@@ -123,6 +127,10 @@ def __init__(self):
123127
self.session = requests.Session()
124128
self.session.mount("https://", adapter)
125129
self.session.mount("http://", adapter)
130+
self.breaker = CircuitBreaker(
131+
fail_max=self.CIRCUIT_BREAKER_FAIL_MAX,
132+
reset_timeout=self.CIRCUIT_BREAKER_RESET_TIMEOUT,
133+
)
126134

127135
@classmethod
128136
def get_instance(cls) -> "TelemetryHttpClient":
@@ -141,7 +149,14 @@ def post(self, url: str, **kwargs) -> requests.Response:
141149
This is a blocking call intended to be run in a background thread.
142150
"""
143151
logger.debug("Executing telemetry POST request to: %s", url)
144-
return self.session.post(url, **kwargs)
152+
try:
153+
return self.breaker.call(self.session.post, url, **kwargs)
154+
except CircuitBreakerError as e:
155+
logger.error("Circuit breaker error: %s", e)
156+
raise e
157+
except Exception as e:
158+
logger.error("Error executing telemetry POST request: %s", e)
159+
raise e
145160

146161
def close(self):
147162
"""Closes the underlying requests.Session."""

tests/unit/test_telemetry.py

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
import uuid
22
import pytest
33
from unittest.mock import patch, MagicMock
4+
import time
5+
import requests
6+
from pybreaker import CircuitBreakerError
7+
from databricks.sql.common.http import TelemetryHttpClient
48

59
from databricks.sql.telemetry.telemetry_client import (
610
TelemetryClient,
711
NoopTelemetryClient,
812
TelemetryClientFactory,
913
TelemetryHelper,
10-
BaseTelemetryClient,
1114
)
1215
from databricks.sql.telemetry.models.enums import AuthMech, AuthFlow
1316
from databricks.sql.auth.authenticators import (
@@ -316,3 +319,93 @@ def test_connection_failure_sends_correct_telemetry_payload(
316319
call_arguments = mock_export_failure_log.call_args
317320
assert call_arguments[0][0] == "Exception"
318321
assert call_arguments[0][1] == error_message
322+
323+
324+
class TestTelemetryHttpClient:
325+
"""Tests for the TelemetryHttpClient, including retry and circuit breaker logic."""
326+
327+
@pytest.fixture
328+
def http_client(self):
329+
"""
330+
Provides a fresh TelemetryHttpClient instance for each test,
331+
ensuring the singleton state is reset.
332+
"""
333+
if TelemetryHttpClient._instance:
334+
TelemetryHttpClient.get_instance().close()
335+
336+
client = TelemetryHttpClient.get_instance()
337+
yield client
338+
339+
client.close()
340+
341+
def test_circuit_breaker_opens_after_failures(self, http_client):
342+
"""Verify the circuit opens after N consecutive failures and rejects new calls."""
343+
fail_max = 3
344+
http_client.breaker.fail_max = fail_max
345+
346+
with patch.object(http_client.session, "post") as mock_post:
347+
mock_post.side_effect = requests.exceptions.RequestException("Connection failed")
348+
349+
for _ in range(fail_max - 1):
350+
with pytest.raises(requests.exceptions.RequestException):
351+
http_client.post("https://test.com/telemetry")
352+
353+
with pytest.raises(CircuitBreakerError):
354+
http_client.post("https://test.com/telemetry")
355+
356+
assert http_client.breaker.current_state == "open"
357+
assert mock_post.call_count == fail_max
358+
359+
with pytest.raises(CircuitBreakerError):
360+
http_client.post("https://test.com/telemetry")
361+
assert mock_post.call_count == fail_max
362+
363+
def test_circuit_breaker_closes_after_timeout_and_success(self, http_client):
364+
"""Verify the circuit moves to half-open and then closes after a successful probe."""
365+
fail_max = 2
366+
reset_timeout = 0.1
367+
http_client.breaker.fail_max = fail_max
368+
http_client.breaker.reset_timeout = reset_timeout
369+
370+
with patch.object(http_client.session, "post") as mock_post:
371+
mock_post.side_effect = [
372+
requests.exceptions.RequestException("Fail 1"),
373+
requests.exceptions.RequestException("Fail 2"),
374+
MagicMock(ok=True)
375+
]
376+
377+
with pytest.raises(requests.exceptions.RequestException):
378+
http_client.post("https://test.com")
379+
with pytest.raises(CircuitBreakerError):
380+
http_client.post("https://test.com")
381+
382+
assert http_client.breaker.current_state == "open"
383+
time.sleep(reset_timeout)
384+
385+
http_client.post("https://test.com")
386+
assert http_client.breaker.current_state == "closed"
387+
assert mock_post.call_count == 3
388+
389+
def test_circuit_breaker_reopens_if_probe_fails(self, http_client):
390+
"""Verify the circuit moves to half-open and then back to open if the probe fails."""
391+
fail_max = 2
392+
reset_timeout = 0.1
393+
http_client.breaker.fail_max = fail_max
394+
http_client.breaker.reset_timeout = reset_timeout
395+
396+
with patch.object(http_client.session, "post") as mock_post:
397+
mock_post.side_effect = requests.exceptions.RequestException("Always fails")
398+
399+
with pytest.raises(requests.exceptions.RequestException):
400+
http_client.post("https://test.com")
401+
with pytest.raises(CircuitBreakerError):
402+
http_client.post("https://test.com")
403+
404+
assert http_client.breaker.current_state == "open"
405+
time.sleep(reset_timeout)
406+
407+
with pytest.raises(CircuitBreakerError):
408+
http_client.post("https://test.com")
409+
410+
assert http_client.breaker.current_state == "open"
411+
assert mock_post.call_count == 3

0 commit comments

Comments
 (0)