Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): Adjust connection timeout and retry logic #685

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions kazoo/protocol/connection.py
Original file line number Diff line number Diff line change
@@ -612,7 +612,9 @@ def _connect_attempt(self, host, hostip, port, retry):

try:
self._xid = 0
read_timeout, connect_timeout = self._connect(host, hostip, port)
read_timeout, connect_timeout = self._connect(
host, hostip, port, timeout=retry.cur_delay
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic looks good, but the default initial _cur_delay is 100ms. Do we really want to keep it that low?

)
read_timeout = read_timeout / 1000.0
connect_timeout = connect_timeout / 1000.0
retry.reset()
@@ -683,7 +685,7 @@ def _connect_attempt(self, host, hostip, port, retry):
if self._socket is not None:
self._socket.close()

def _connect(self, host, hostip, port):
def _connect(self, host, hostip, port, timeout):
client = self.client
self.logger.info(
"Connecting to %s(%s):%s, use_ssl: %r",
@@ -703,7 +705,7 @@ def _connect(self, host, hostip, port):
with self._socket_error_handling():
self._socket = self.handler.create_connection(
address=(hostip, port),
timeout=client._session_timeout / 1000.0,
timeout=timeout,
use_ssl=self.client.use_ssl,
keyfile=self.client.keyfile,
certfile=self.client.certfile,
4 changes: 4 additions & 0 deletions kazoo/retry.py
Original file line number Diff line number Diff line change
@@ -109,6 +109,10 @@ def copy(self):
obj.retry_exceptions = self.retry_exceptions
return obj

@property
def cur_delay(self):
return self._cur_delay

def __call__(self, func, *args, **kwargs):
"""Call a function with arguments until it completes without
throwing a Kazoo exception
106 changes: 106 additions & 0 deletions kazoo/tests/test_cnx_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from unittest import mock

import pytest

from kazoo import retry
from kazoo.handlers import threading
from kazoo.protocol import connection
from kazoo.protocol import states


@mock.patch("kazoo.protocol.connection.ConnectionHandler._expand_client_hosts")
def test_retry_logic(mock_expand):
mock_client = mock.Mock()
mock_client._state = states.KeeperState.CLOSED
mock_client._session_id = None
mock_client._session_passwd = b"\x00" * 16
mock_client._stopped.is_set.return_value = False
mock_client.handler.timeout_exception = threading.KazooTimeoutError
mock_client.handler.create_connection.side_effect = (
threading.KazooTimeoutError()
)
test_retry = retry.KazooRetry(
max_tries=6,
delay=1.0,
backoff=2,
max_delay=30.0,
max_jitter=0.0,
sleep_func=lambda _x: None,
)
test_cnx = connection.ConnectionHandler(
client=mock_client,
retry_sleeper=test_retry,
)
mock_expand.return_value = [
("a", "1.1.1.1", 2181),
("b", "2.2.2.2", 2181),
("c", "3.3.3.3", 2181),
]

with pytest.raises(retry.RetryFailedError):
test_retry(test_cnx._connect_loop, test_retry)

assert mock_client.handler.create_connection.call_args_list[:3] == [
mock.call(
address=("1.1.1.1", 2181),
timeout=1.0,
use_ssl=mock.ANY,
keyfile=mock.ANY,
certfile=mock.ANY,
ca=mock.ANY,
keyfile_password=mock.ANY,
verify_certs=mock.ANY,
),
mock.call(
address=("2.2.2.2", 2181),
timeout=1.0,
use_ssl=mock.ANY,
keyfile=mock.ANY,
certfile=mock.ANY,
ca=mock.ANY,
keyfile_password=mock.ANY,
verify_certs=mock.ANY,
),
mock.call(
address=("3.3.3.3", 2181),
timeout=1.0,
use_ssl=mock.ANY,
keyfile=mock.ANY,
certfile=mock.ANY,
ca=mock.ANY,
keyfile_password=mock.ANY,
verify_certs=mock.ANY,
),
], "All hosts are first tried with the lowest timeout value"
assert mock_client.handler.create_connection.call_args_list[-3:] == [
mock.call(
address=("1.1.1.1", 2181),
timeout=30.0,
use_ssl=mock.ANY,
keyfile=mock.ANY,
certfile=mock.ANY,
ca=mock.ANY,
keyfile_password=mock.ANY,
verify_certs=mock.ANY,
),
mock.call(
address=("2.2.2.2", 2181),
timeout=30.0,
use_ssl=mock.ANY,
keyfile=mock.ANY,
certfile=mock.ANY,
ca=mock.ANY,
keyfile_password=mock.ANY,
verify_certs=mock.ANY,
),
mock.call(
address=("3.3.3.3", 2181),
timeout=30.0,
use_ssl=mock.ANY,
keyfile=mock.ANY,
certfile=mock.ANY,
ca=mock.ANY,
keyfile_password=mock.ANY,
verify_certs=mock.ANY,
),
], "All hosts are last tried with the highest timeout value"
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -123,6 +123,7 @@ module = [
'kazoo.tests.test_cache',
'kazoo.tests.test_client',
'kazoo.tests.test_connection',
'kazoo.tests.test_cnx_retry',
'kazoo.tests.test_counter',
'kazoo.tests.test_election',
'kazoo.tests.test_eventlet_handler',