Skip to content

Commit 66963be

Browse files
authored
Fix bolt handshake not having a timeout (#915)
This is a backport of #905
1 parent 98dfae9 commit 66963be

File tree

3 files changed

+60
-53
lines changed

3 files changed

+60
-53
lines changed

neo4j/io/__init__.py

+17-24
Original file line numberDiff line numberDiff line change
@@ -279,16 +279,19 @@ def get_handshake(cls):
279279
return b"".join(version.to_bytes() for version in offered_versions).ljust(16, b"\x00")
280280

281281
@classmethod
282-
def ping(cls, address, *, timeout=None, pool_config=None):
282+
def ping(cls, address, *, deadline=None, pool_config=None):
283283
""" Attempt to establish a Bolt connection, returning the
284284
agreed Bolt protocol version if successful.
285285
"""
286286
if pool_config is None:
287287
pool_config = PoolConfig()
288+
if deadline is None:
289+
deadline = Deadline(None)
288290
try:
289291
s, protocol_version, handshake, data = BoltSocket.connect(
290292
address,
291-
timeout=timeout,
293+
tcp_timeout=pool_config.connection_timeout,
294+
deadline=deadline,
292295
custom_resolver=pool_config.resolver,
293296
ssl_context=pool_config.get_ssl_context(),
294297
keep_alive=pool_config.keep_alive,
@@ -300,38 +303,30 @@ def ping(cls, address, *, timeout=None, pool_config=None):
300303
return protocol_version
301304

302305
@classmethod
303-
def open(cls, address, *, auth=None, timeout=None, routing_context=None,
306+
def open(cls, address, *, auth=None, deadline=None, routing_context=None,
304307
pool_config=None):
305308
""" Open a new Bolt connection to a given server address.
306309
307310
:param address:
308311
:param auth:
309-
:param timeout: the connection timeout in seconds
312+
:param deadline: how long to wait for the connection to be established
310313
:param routing_context: dict containing routing context
311314
:param pool_config:
312315
:return:
313316
:raise BoltHandshakeError: raised if the Bolt Protocol can not negotiate a protocol version.
314317
:raise ServiceUnavailable: raised if there was a connection issue.
315318
"""
316-
def time_remaining():
317-
if timeout is None:
318-
return None
319-
t = timeout - (perf_counter() - t0)
320-
return t if t > 0 else 0
321319

322-
t0 = perf_counter()
323320
if pool_config is None:
324321
pool_config = PoolConfig()
322+
if deadline is None:
323+
deadline = Deadline(None)
325324

326325
socket_connection_timeout = pool_config.connection_timeout
327-
if socket_connection_timeout is None:
328-
socket_connection_timeout = time_remaining()
329-
elif timeout is not None:
330-
socket_connection_timeout = min(pool_config.connection_timeout,
331-
time_remaining())
332326
s, pool_config.protocol_version, handshake, data = BoltSocket.connect(
333327
address,
334-
timeout=socket_connection_timeout,
328+
tcp_timeout=pool_config.connection_timeout,
329+
deadline=deadline,
335330
custom_resolver=pool_config.resolver,
336331
ssl_context=pool_config.get_ssl_context(),
337332
keep_alive=pool_config.keep_alive,
@@ -370,7 +365,7 @@ def time_remaining():
370365
)
371366

372367
try:
373-
connection.socket.set_deadline(time_remaining())
368+
connection.socket.set_deadline(deadline)
374369
try:
375370
connection.hello()
376371
finally:
@@ -732,9 +727,7 @@ def connection_creator():
732727
released_reservation = False
733728
try:
734729
try:
735-
connection = self.opener(
736-
address, deadline.to_timeout()
737-
)
730+
connection = self.opener(address, deadline)
738731
except ServiceUnavailable:
739732
self.deactivate(address)
740733
raise
@@ -909,9 +902,9 @@ def open(cls, address, *, auth, pool_config, workspace_config):
909902
:return: BoltPool
910903
"""
911904

912-
def opener(addr, timeout):
905+
def opener(addr, deadline):
913906
return Bolt.open(
914-
addr, auth=auth, timeout=timeout, routing_context=None,
907+
addr, auth=auth, deadline=deadline, routing_context=None,
915908
pool_config=pool_config
916909
)
917910

@@ -955,8 +948,8 @@ def open(cls, *addresses, auth, pool_config, workspace_config, routing_context=N
955948
raise ConfigurationError("The key 'address' is reserved for routing context.")
956949
routing_context["address"] = str(address)
957950

958-
def opener(addr, timeout):
959-
return Bolt.open(addr, auth=auth, timeout=timeout,
951+
def opener(addr, deadline):
952+
return Bolt.open(addr, auth=auth, deadline=deadline,
960953
routing_context=routing_context,
961954
pool_config=pool_config)
962955

neo4j/io/_socket.py

+39-25
Original file line numberDiff line numberDiff line change
@@ -190,11 +190,12 @@ def _secure(cls, s, host, ssl_context):
190190
return s
191191

192192
@classmethod
193-
def _handshake(cls, s, resolved_address):
193+
def _handshake(cls, s, resolved_address, deadline):
194194
"""
195195
196196
:param s: Socket
197197
:param resolved_address:
198+
:param deadline:
198199
199200
:return: (socket, version, client_handshake, server_response_data)
200201
"""
@@ -214,46 +215,52 @@ def _handshake(cls, s, resolved_address):
214215
log.debug("[#%04X] C: <HANDSHAKE> %s %s %s %s", local_port,
215216
*supported_versions)
216217

217-
data = cls.Bolt.MAGIC_PREAMBLE + cls.Bolt.get_handshake()
218-
s.sendall(data)
218+
request = cls.Bolt.MAGIC_PREAMBLE + cls.Bolt.get_handshake()
219219

220220
# Handle the handshake response
221-
ready_to_read = False
222-
with selectors.DefaultSelector() as selector:
223-
selector.register(s, selectors.EVENT_READ)
224-
selector.select(1)
221+
original_timeout = s.gettimeout()
222+
s.settimeout(deadline.to_timeout())
225223
try:
226-
data = s.recv(4)
227-
except OSError:
224+
s.sendall(request)
225+
response = s.recv(4)
226+
except OSError as exc:
228227
raise ServiceUnavailable(
229-
"Failed to read any data from server {!r} "
230-
"after connected".format(resolved_address))
231-
data_size = len(data)
228+
f"Failed to read any data from server {resolved_address!r} "
229+
f"after connected (deadline {deadline})"
230+
) from exc
231+
finally:
232+
s.settimeout(original_timeout)
233+
data_size = len(response)
232234
if data_size == 0:
233235
# If no data is returned after a successful select
234236
# response, the server has closed the connection
235237
log.debug("[#%04X] S: <CLOSE>", local_port)
236238
cls.close_socket(s)
237239
raise ServiceUnavailable(
238-
"Connection to {address} closed without handshake response".format(
239-
address=resolved_address))
240+
f"Connection to {resolved_address} closed without handshake "
241+
"response"
242+
)
240243
if data_size != 4:
241244
# Some garbled data has been received
242245
log.debug("[#%04X] S: @*#!", local_port)
243246
cls.close_socket(s)
244247
raise BoltProtocolError(
245-
"Expected four byte Bolt handshake response from %r, received %r instead; check for incorrect port number" % (
246-
resolved_address, data), address=resolved_address)
247-
elif data == b"HTTP":
248+
"Expected four byte Bolt handshake response from "
249+
f"{resolved_address!r}, received {response!r} instead; "
250+
"check for incorrect port number"
251+
, address=resolved_address
252+
)
253+
elif response == b"HTTP":
248254
log.debug("[#%04X] S: <CLOSE>", local_port)
249255
cls.close_socket(s)
250256
raise ServiceUnavailable(
251-
"Cannot to connect to Bolt service on {!r} "
252-
"(looks like HTTP)".format(resolved_address))
253-
agreed_version = data[-1], data[-2]
257+
f"Cannot to connect to Bolt service on {resolved_address!r} "
258+
"(looks like HTTP)"
259+
)
260+
agreed_version = response[-1], response[-2]
254261
log.debug("[#%04X] S: <HANDSHAKE> 0x%06X%02X", local_port,
255262
agreed_version[1], agreed_version[0])
256-
return cls(s), agreed_version, handshake, data
263+
return cls(s), agreed_version, handshake, response
257264

258265
@classmethod
259266
def close_socket(cls, socket_):
@@ -269,8 +276,8 @@ def close_socket(cls, socket_):
269276
pass
270277

271278
@classmethod
272-
def connect(cls, address, *, timeout, custom_resolver, ssl_context,
273-
keep_alive):
279+
def connect(cls, address, *, tcp_timeout, deadline, custom_resolver,
280+
ssl_context, keep_alive):
274281
""" Connect and perform a handshake and return a valid Connection object,
275282
assuming a protocol version can be agreed.
276283
"""
@@ -281,12 +288,19 @@ def connect(cls, address, *, timeout, custom_resolver, ssl_context,
281288

282289
resolved_addresses = Address(address).resolve(resolver=custom_resolver)
283290
for resolved_address in resolved_addresses:
291+
deadline_timeout = deadline.to_timeout()
292+
if (
293+
deadline_timeout is not None
294+
and deadline_timeout <= tcp_timeout
295+
):
296+
tcp_timeout = deadline_timeout
284297
s = None
285298
try:
286-
s = BoltSocket._connect(resolved_address, timeout, keep_alive)
299+
s = BoltSocket._connect(resolved_address, tcp_timeout,
300+
keep_alive)
287301
s = BoltSocket._secure(s, resolved_address.host_name,
288302
ssl_context)
289-
return BoltSocket._handshake(s, resolved_address)
303+
return BoltSocket._handshake(s, resolved_address, deadline)
290304
except (BoltError, DriverError, OSError) as error:
291305
try:
292306
local_port = s.getsockname()[1]

tests/unit/io/test_direct.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -113,19 +113,19 @@ class BoltTestCase(TestCase):
113113

114114
def test_open(self):
115115
with pytest.raises(ServiceUnavailable):
116-
connection = Bolt.open(("localhost", 9999), auth=("test", "test"))
116+
Bolt.open(("localhost", 9999), auth=("test", "test"))
117117

118118
def test_open_timeout(self):
119-
conf = PoolConfig()
120119
with pytest.raises(ServiceUnavailable):
121-
connection = Bolt.open(("localhost", 9999), auth=("test", "test"), timeout=1)
120+
Bolt.open(("localhost", 9999), auth=("test", "test"),
121+
deadline=Deadline(1))
122122

123123
def test_ping(self):
124124
protocol_version = Bolt.ping(("localhost", 9999))
125125
assert protocol_version is None
126126

127127
def test_ping_timeout(self):
128-
protocol_version = Bolt.ping(("localhost", 9999), timeout=1)
128+
protocol_version = Bolt.ping(("localhost", 9999), deadline=Deadline(1))
129129
assert protocol_version is None
130130

131131

0 commit comments

Comments
 (0)