Skip to content

Commit b3b9a86

Browse files
committed
Fix _next_afi_sockaddr crash; revert Socks5Wrapper change; reset _socks5_proxy in close()
1 parent f3d9fb2 commit b3b9a86

File tree

1 file changed

+7
-5
lines changed

1 file changed

+7
-5
lines changed

kafka/conn.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -326,8 +326,9 @@ def _dns_lookup(self):
326326
return True
327327

328328
def _next_afi_sockaddr(self):
329-
if self._socks5_proxy.use_remote_lookup():
329+
if self._socks5_proxy and self._socks5_proxy.use_remote_lookup():
330330
return (socket.AF_UNSPEC, (self.host, self.port))
331+
331332
if not self._gai:
332333
if not self._dns_lookup():
333334
return
@@ -368,9 +369,6 @@ def connect_blocking(self, timeout=float('inf')):
368369

369370
def connect(self):
370371
"""Attempt to connect and return ConnectionState"""
371-
if self.config["socks5_proxy"] is not None and self._socks5_proxy is None:
372-
self._socks5_proxy = Socks5Wrapper(self.config["socks5_proxy"], self.afi)
373-
374372
if self.state is ConnectionStates.DISCONNECTED and not self.blacked_out():
375373
self.state = ConnectionStates.CONNECTING
376374
self.last_attempt = time.time()
@@ -384,6 +382,7 @@ def connect(self):
384382
self._sock_afi, self._sock_addr = next_lookup
385383
try:
386384
if self.config["socks5_proxy"] is not None:
385+
self._socks5_proxy = Socks5Wrapper(self.config["socks5_proxy"], self.afi)
387386
self._sock = self._socks5_proxy.socket(self._sock_afi, socket.SOCK_STREAM)
388387
else:
389388
self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM)
@@ -866,7 +865,9 @@ def connection_delay(self):
866865
large number to handle slow/stalled connections.
867866
"""
868867
if self.disconnected() or self.connecting():
869-
if len(self._gai) > 0 or (self._socks5_proxy is not None and self._socks5_proxy.use_remote_lookup()):
868+
if len(self._gai) > 0:
869+
return 0
870+
elif self._socks5_proxy and self._socks5_proxy.use_remote_lookup():
870871
return 0
871872
else:
872873
time_waited = time.time() - self.last_attempt
@@ -968,6 +969,7 @@ def close(self, error=None):
968969
# the socket fd from selectors cleanly.
969970
sock = self._sock
970971
self._sock = None
972+
self._socks5_proxy = None
971973

972974
# drop lock before state change callback and processing futures
973975
self.config['state_change_callback'](self.node_id, sock, self)

0 commit comments

Comments
 (0)