Skip to content

Commit 019acd8

Browse files
author
Jesse
authored
Use urllib3 for retries (#182)
Behaviour is gated behind `enable_v3_retries` config. This will be removed and become the default behaviour in a subsequent release. Signed-off-by: Jesse Whitehouse <[email protected]>
1 parent 00a3928 commit 019acd8

File tree

9 files changed

+807
-11
lines changed

9 files changed

+807
-11
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## 2.8.x (Unreleased)
44

5+
- Replace retry handling with DatabricksRetryPolicy. This is disabled by default. To enable, set `enable_v3_retries=True` when creating `databricks.sql.client`
56
- Other: Fix typo in README quick start example
67
- Other: Add autospec to Client mocks and tidy up `make_request`
78

src/databricks/sql/auth/retry.py

Lines changed: 410 additions & 0 deletions
Large diffs are not rendered by default.

src/databricks/sql/auth/thrift_http_client.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
from urllib3 import HTTPConnectionPool, HTTPSConnectionPool, ProxyManager
1717

18+
from databricks.sql.auth.retry import CommandType, DatabricksRetryPolicy
19+
1820

1921
class THttpClient(thrift.transport.THttpClient.THttpClient):
2022
def __init__(
@@ -28,6 +30,7 @@ def __init__(
2830
key_file=None,
2931
ssl_context=None,
3032
max_connections: int = 1,
33+
retry_policy: Union[DatabricksRetryPolicy, int] = 0,
3134
):
3235
if port is not None:
3336
warnings.warn(
@@ -81,6 +84,10 @@ def __init__(
8184

8285
self.max_connections = max_connections
8386

87+
# If retry_policy == 0 then urllib3 will not retry automatically
88+
# this falls back to the pre-v3 behaviour where thrift_backend.py handles retry logic
89+
self.retry_policy = retry_policy
90+
8491
self.__wbuf = BytesIO()
8592
self.__resp: Union[None, HTTPResponse] = None
8693
self.__timeout = None
@@ -92,6 +99,13 @@ def setCustomHeaders(self, headers: Dict[str, str]):
9299
self._headers = headers
93100
super().setCustomHeaders(headers)
94101

102+
def startRetryTimer(self):
103+
"""Notify DatabricksRetryPolicy of the request start time
104+
105+
This is used to enforce the retry_stop_after_attempts_duration
106+
"""
107+
self.retry_policy and self.retry_policy.start_retry_timer()
108+
95109
def open(self):
96110

97111
# self.__pool replaces the self.__http used by the original THttpClient
@@ -167,6 +181,7 @@ def flush(self):
167181
headers=headers,
168182
preload_content=False,
169183
timeout=self.__timeout,
184+
retries=self.retry_policy,
170185
)
171186

172187
# Get reply to flush the request
@@ -188,3 +203,12 @@ def basic_proxy_auth_header(proxy):
188203
)
189204
cr = base64.b64encode(ap.encode()).strip()
190205
return "Basic " + six.ensure_str(cr)
206+
207+
def set_retry_command_type(self, value: CommandType):
208+
"""Pass the provided CommandType to the retry policy"""
209+
if isinstance(self.retry_policy, DatabricksRetryPolicy):
210+
self.retry_policy.command_type = value
211+
else:
212+
logger.warning(
213+
"DatabricksRetryPolicy is currently bypassed. The CommandType cannot be set."
214+
)

src/databricks/sql/client.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@
88

99
from databricks.sql import __version__
1010
from databricks.sql import *
11-
from databricks.sql.exc import OperationalError
11+
from databricks.sql.exc import (
12+
OperationalError,
13+
SessionAlreadyClosedError,
14+
CursorAlreadyClosedError,
15+
)
1216
from databricks.sql.thrift_backend import ThriftBackend
1317
from databricks.sql.utils import ExecuteResponse, ParamEscaper, inject_parameters
1418
from databricks.sql.types import Row
@@ -257,6 +261,9 @@ def _close(self, close_cursors=True) -> None:
257261

258262
try:
259263
self.thrift_backend.close_session(self._session_handle)
264+
except RequestError as e:
265+
if isinstance(e.args[1], SessionAlreadyClosedError):
266+
logger.info("Session was closed by a prior request")
260267
except DatabaseError as e:
261268
if "Invalid SessionHandle" in str(e):
262269
logger.warning(
@@ -958,6 +965,9 @@ def close(self) -> None:
958965
and self.connection.open
959966
):
960967
self.thrift_backend.close_command(self.command_id)
968+
except RequestError as e:
969+
if isinstance(e.args[1], CursorAlreadyClosedError):
970+
logger.info("Operation was canceled by a prior request")
961971
finally:
962972
self.has_been_closed_server_side = True
963973
self.op_state = self.thrift_backend.CLOSED_OP_STATE

src/databricks/sql/exc.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,3 +93,25 @@ class RequestError(OperationalError):
9393
"""
9494

9595
pass
96+
97+
98+
class MaxRetryDurationError(RequestError):
99+
"""Thrown if the next HTTP request retry would exceed the configured
100+
stop_after_attempts_duration
101+
"""
102+
103+
104+
class NonRecoverableNetworkError(RequestError):
105+
"""Thrown if an HTTP code 501 is received"""
106+
107+
108+
class UnsafeToRetryError(RequestError):
109+
"""Thrown if ExecuteStatement request receives a code other than 200, 429, or 503"""
110+
111+
112+
class SessionAlreadyClosedError(RequestError):
113+
"""Thrown if CloseSession receives a code 404. ThriftBackend should gracefully proceed as this is expected."""
114+
115+
116+
class CursorAlreadyClosedError(RequestError):
117+
"""Thrown if CancelOperation receives a code 404. ThriftBackend should gracefully proceed as this is expected."""

src/databricks/sql/thrift_backend.py

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717
import urllib3.exceptions
1818

1919
import databricks.sql.auth.thrift_http_client
20+
from databricks.sql.auth.thrift_http_client import CommandType
2021
from databricks.sql.auth.authenticators import AuthProvider
2122
from databricks.sql.thrift_api.TCLIService import TCLIService, ttypes
2223
from databricks.sql import *
24+
from databricks.sql.exc import MaxRetryDurationError
2325
from databricks.sql.thrift_api.TCLIService.TCLIService import (
2426
Client as TCLIServiceClient,
2527
)
@@ -70,6 +72,12 @@ class ThriftBackend:
7072
CLOSED_OP_STATE = ttypes.TOperationState.CLOSED_STATE
7173
ERROR_OP_STATE = ttypes.TOperationState.ERROR_STATE
7274

75+
_retry_delay_min: float
76+
_retry_delay_max: float
77+
_retry_stop_after_attempts_count: int
78+
_retry_stop_after_attempts_duration: float
79+
_retry_delay_default: float
80+
7381
def __init__(
7482
self,
7583
server_hostname: str,
@@ -113,9 +121,15 @@ def __init__(
113121
#
114122
# _retry_stop_after_attempts_count
115123
# The maximum number of times we should retry retryable requests (defaults to 24)
124+
# _retry_dangerous_codes
125+
# An iterable of integer HTTP status codes. ExecuteStatement commands will be retried if these codes are received.
126+
# (defaults to [])
116127
# _socket_timeout
117128
# The timeout in seconds for socket send, recv and connect operations. Should be a positive float or integer.
118129
# (defaults to 900)
130+
# _enable_v3_retries
131+
# Whether to use the DatabricksRetryPolicy implemented in urllib3
132+
# (defaults to False)
119133
# max_download_threads
120134
# Number of threads for handling cloud fetch downloads. Defaults to 10
121135

@@ -166,10 +180,28 @@ def __init__(
166180

167181
self._auth_provider = auth_provider
168182

183+
# Connector version 3 retry approach
184+
self.enable_v3_retries = kwargs.get("_enable_v3_retries", False)
185+
self.force_dangerous_codes = kwargs.get("_retry_dangerous_codes", [])
186+
187+
additional_transport_args = {}
188+
if self.enable_v3_retries:
189+
self.retry_policy = databricks.sql.auth.thrift_http_client.DatabricksRetryPolicy(
190+
delay_min=self._retry_delay_min,
191+
delay_max=self._retry_delay_max,
192+
stop_after_attempts_count=self._retry_stop_after_attempts_count,
193+
stop_after_attempts_duration=self._retry_stop_after_attempts_duration,
194+
delay_default=self._retry_delay_default,
195+
force_dangerous_codes=self.force_dangerous_codes,
196+
)
197+
198+
additional_transport_args["retry_policy"] = self.retry_policy
199+
169200
self._transport = databricks.sql.auth.thrift_http_client.THttpClient(
170201
auth_provider=self._auth_provider,
171202
uri_or_host=uri,
172203
ssl_context=ssl_context,
204+
**additional_transport_args, # type: ignore
173205
)
174206

175207
timeout = kwargs.get("_socket_timeout", DEFAULT_SOCKET_TIMEOUT)
@@ -188,6 +220,7 @@ def __init__(
188220

189221
self._request_lock = threading.RLock()
190222

223+
# TODO: Move this bounding logic into DatabricksRetryPolicy for v3 (PECO-918)
191224
def _initialize_retry_args(self, kwargs):
192225
# Configure retries & timing: use user-settings or defaults, and bound
193226
# by policy. Log.warn when given param gets restricted.
@@ -335,12 +368,17 @@ def attempt_request(attempt):
335368

336369
error, error_message, retry_delay = None, None, None
337370
try:
338-
logger.debug(
339-
"Sending request: {}(<REDACTED>)".format(
340-
getattr(method, "__name__")
341-
)
342-
)
371+
372+
this_method_name = getattr(method, "__name__")
373+
374+
logger.debug("Sending request: {}(<REDACTED>)".format(this_method_name))
343375
unsafe_logger.debug("Sending request: {}".format(request))
376+
377+
# These three lines are no-ops if the v3 retry policy is not in use
378+
this_command_type = CommandType.get(this_method_name)
379+
self._transport.set_retry_command_type(this_command_type)
380+
self._transport.startRetryTimer()
381+
344382
response = method(request)
345383

346384
# Calling `close()` here releases the active HTTP connection back to the pool
@@ -356,9 +394,16 @@ def attempt_request(attempt):
356394
except urllib3.exceptions.HTTPError as err:
357395
# retry on timeout. Happens a lot in Azure and it is safe as data has not been sent to server yet
358396

397+
# TODO: don't use exception handling for GOS polling...
398+
359399
gos_name = TCLIServiceClient.GetOperationStatus.__name__
360400
if method.__name__ == gos_name:
361-
retry_delay = bound_retry_delay(attempt, self._retry_delay_default)
401+
delay_default = (
402+
self.enable_v3_retries
403+
and self.retry_policy.delay_default
404+
or self._retry_delay_default
405+
)
406+
retry_delay = bound_retry_delay(attempt, delay_default)
362407
logger.info(
363408
f"GetOperationStatus failed with HTTP error and will be retried: {str(err)}"
364409
)

src/databricks/sqlalchemy/dialect/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class DatabricksDialect(default.DefaultDialect):
7474
driver: str = "databricks-sql-python"
7575
default_schema_name: str = "default"
7676

77-
preparer = DatabricksIdentifierPreparer
77+
preparer = DatabricksIdentifierPreparer # type: ignore
7878
type_compiler = DatabricksTypeCompiler
7979
ddl_compiler = DatabricksDDLCompiler
8080
supports_statement_cache: bool = True

0 commit comments

Comments
 (0)