28
28
Union ,
29
29
)
30
30
31
- from pymongo import ssl_support
31
+ from pymongo import _csot , ssl_support
32
32
from pymongo ._asyncio_task import create_task
33
33
from pymongo .errors import _OperationCancelled
34
34
from pymongo .socket_checker import _errno_from_exception
@@ -316,6 +316,42 @@ async def _async_receive(conn: socket.socket, length: int, loop: AbstractEventLo
316
316
return mv
317
317
318
318
319
+ _PYPY = "PyPy" in sys .version
320
+
321
+
322
+ def wait_for_read (conn : Connection , deadline : Optional [float ]) -> None :
323
+ """Block until at least one byte is read, or a timeout, or a cancel."""
324
+ sock = conn .conn
325
+ timed_out = False
326
+ # Check if the connection's socket has been manually closed
327
+ if sock .fileno () == - 1 :
328
+ return
329
+ while True :
330
+ # SSLSocket can have buffered data which won't be caught by select.
331
+ if hasattr (sock , "pending" ) and sock .pending () > 0 :
332
+ readable = True
333
+ else :
334
+ # Wait up to 500ms for the socket to become readable and then
335
+ # check for cancellation.
336
+ if deadline :
337
+ remaining = deadline - time .monotonic ()
338
+ # When the timeout has expired perform one final check to
339
+ # see if the socket is readable. This helps avoid spurious
340
+ # timeouts on AWS Lambda and other FaaS environments.
341
+ if remaining <= 0 :
342
+ timed_out = True
343
+ timeout = max (min (remaining , _POLL_TIMEOUT ), 0 )
344
+ else :
345
+ timeout = _POLL_TIMEOUT
346
+ readable = conn .socket_checker .select (sock , read = True , timeout = timeout )
347
+ if conn .cancel_context .cancelled :
348
+ raise _OperationCancelled ("operation cancelled" )
349
+ if readable :
350
+ return
351
+ if timed_out :
352
+ raise socket .timeout ("timed out" )
353
+
354
+
319
355
def receive_data (conn : Connection , length : int , deadline : Optional [float ]) -> memoryview :
320
356
buf = bytearray (length )
321
357
mv = memoryview (buf )
@@ -324,18 +360,25 @@ def receive_data(conn: Connection, length: int, deadline: Optional[float]) -> me
324
360
# check for the cancellation signal after each timeout. Alternatively we
325
361
# could close the socket but that does not reliably cancel recv() calls
326
362
# on all OSes.
363
+ # When the timeout has expired we perform one final non-blocking recv.
364
+ # This helps avoid spurious timeouts when the response is actually already
365
+ # buffered on the client.
327
366
orig_timeout = conn .conn .gettimeout ()
328
367
try :
329
368
while bytes_read < length :
330
- if deadline is not None :
331
- # CSOT: Update timeout. When the timeout has expired perform one
332
- # final non-blocking recv. This helps avoid spurious timeouts when
333
- # the response is actually already buffered on the client.
334
- short_timeout = min (max (deadline - time .monotonic (), 0 ), _POLL_TIMEOUT )
335
- else :
336
- short_timeout = _POLL_TIMEOUT
337
- conn .set_conn_timeout (short_timeout )
338
369
try :
370
+ # Use the legacy wait_for_read cancellation approach on PyPy due to PYTHON-5011.
371
+ if _PYPY :
372
+ wait_for_read (conn , deadline )
373
+ if _csot .get_timeout () and deadline is not None :
374
+ conn .set_conn_timeout (max (deadline - time .monotonic (), 0 ))
375
+ else :
376
+ if deadline is not None :
377
+ short_timeout = min (max (deadline - time .monotonic (), 0 ), _POLL_TIMEOUT )
378
+ else :
379
+ short_timeout = _POLL_TIMEOUT
380
+ conn .set_conn_timeout (short_timeout )
381
+
339
382
chunk_length = conn .conn .recv_into (mv [bytes_read :])
340
383
except BLOCKING_IO_ERRORS :
341
384
if conn .cancel_context .cancelled :
@@ -345,6 +388,9 @@ def receive_data(conn: Connection, length: int, deadline: Optional[float]) -> me
345
388
except socket .timeout :
346
389
if conn .cancel_context .cancelled :
347
390
raise _OperationCancelled ("operation cancelled" ) from None
391
+ if _PYPY :
392
+ # We reached the true deadline.
393
+ raise
348
394
continue
349
395
except OSError as exc :
350
396
if conn .cancel_context .cancelled :
0 commit comments