Skip to content

Commit 1fcc3e8

Browse files
authored
Merge branch 'master' into example-fix-314
2 parents 7bb8f24 + dc680eb commit 1fcc3e8

6 files changed

Lines changed: 261 additions & 5 deletions

File tree

pyproject.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,9 @@ test = [
4545
'mypy>=0.800',
4646
]
4747
dev = [
48+
'packaging>=20',
4849
'setuptools>=60',
49-
'Cython~=3.0',
50+
'Cython~=3.1',
5051
]
5152
docs = [
5253
'Sphinx~=4.1.2',
@@ -56,6 +57,7 @@ docs = [
5657

5758
[build-system]
5859
requires = [
60+
"packaging>=20",
5961
"setuptools>=60",
6062
"wheel",
6163
"Cython~=3.1",

setup.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from setuptools.command.sdist import sdist
2222

2323

24-
CYTHON_DEPENDENCY = 'Cython~=3.0'
24+
CYTHON_DEPENDENCY = 'Cython~=3.1'
2525
MACHINE = platform.machine()
2626
MODULES_CFLAGS = [os.getenv('UVLOOP_OPT_CFLAGS', '-O2')]
2727
_ROOT = pathlib.Path(__file__).parent
@@ -108,7 +108,7 @@ def finalize_options(self):
108108
need_cythonize = True
109109

110110
if need_cythonize:
111-
import pkg_resources
111+
from packaging.requirements import Requirement
112112

113113
# Double check Cython presence in case setup_requires
114114
# didn't go into effect (most likely because someone
@@ -121,8 +121,8 @@ def finalize_options(self):
121121
'please install {} to compile uvloop from source'.format(
122122
CYTHON_DEPENDENCY))
123123

124-
cython_dep = pkg_resources.Requirement.parse(CYTHON_DEPENDENCY)
125-
if Cython.__version__ not in cython_dep:
124+
cython_dep = Requirement(CYTHON_DEPENDENCY)
125+
if not cython_dep.specifier.contains(Cython.__version__):
126126
raise RuntimeError(
127127
'uvloop requires {}, got Cython=={}'.format(
128128
CYTHON_DEPENDENCY, Cython.__version__

tests/test_context.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,11 @@ def close():
474474
# send data
475475
await self.loop.run_in_executor(None,
476476
ssl_sock.send, b'hello')
477+
# After gh-105836 run_in_executor may resolve without
478+
# yielding. This is very noticeable when PYTHONASYNCIODEBUG
479+
# is set. Hence, we yield explicitly so that the sent data
480+
# can reach the SSL buffer before close/resume_reading.
481+
await asyncio.sleep(0)
477482
# schedule a proactive transport close which will trigger
478483
# the flushing process to retrieve the remaining data
479484
self.loop.call_soon(close)

tests/test_tcp.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,130 @@ async def test():
737737
with s1, s2:
738738
loop.run_until_complete(test())
739739

740+
def test_create_connection_sock_cancel_detaches(self):
741+
async def client(addr):
742+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
743+
sock.setblocking(False)
744+
try:
745+
sock.connect(addr)
746+
except BlockingIOError:
747+
pass
748+
await asyncio.sleep(0.01)
749+
750+
task = asyncio.ensure_future(
751+
self.loop.create_connection(asyncio.Protocol, sock=sock))
752+
await asyncio.sleep(0)
753+
task.cancel()
754+
with self.assertRaises(asyncio.CancelledError):
755+
await task
756+
757+
# After cancellation the socket must be detached (fd == -1)
758+
# so that its __del__ won't close a recycled fd.
759+
self.assertEqual(sock.fileno(), -1)
760+
761+
def _recv_or_abort(sock):
762+
try:
763+
sock.recv_all(1)
764+
except ConnectionAbortedError:
765+
pass
766+
767+
with self.tcp_server(_recv_or_abort,
768+
max_clients=1,
769+
backlog=1) as srv:
770+
self.loop.run_until_complete(client(srv.addr))
771+
772+
def test_create_connection_sock_cancel_fd_leak(self):
773+
# Regression test for https://github.com/MagicStack/uvloop/issues/645
774+
# and https://github.com/aio-libs/aiohttp/issues/10506
775+
#
776+
# When create_connection(sock=sock) is cancelled, the socket must
777+
# be detached so its close()/`__del__` won't double-close the fd.
778+
# Without the fix, libuv closes the fd but the socket object still
779+
# references it, enabling a chain of fd corruption and data leak:
780+
#
781+
# 1. cancel → libuv closes fd N
782+
# 2. New connection (victim) reuses fd N
783+
# 3. Stale sock.close() closes fd N → breaks the victim
784+
# 4. Another fd N is opened (new connection)
785+
# 5. Victim writev(N) → data goes to the wrong connection
786+
787+
async def test():
788+
srv = await asyncio.start_server(
789+
lambda r, w: w.close(),
790+
'127.0.0.1', 0,
791+
family=socket.AF_INET)
792+
addr = srv.sockets[0].getsockname()
793+
794+
# --- Step 1: create_connection with sock= and cancel it ---
795+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
796+
sock.setblocking(False)
797+
await self.loop.sock_connect(sock, addr)
798+
stale_fd = sock.fileno()
799+
800+
task = self.loop.create_task(
801+
self.loop.create_connection(asyncio.Protocol, sock=sock)
802+
)
803+
await asyncio.sleep(0)
804+
task.cancel()
805+
with self.assertRaises(asyncio.CancelledError):
806+
await task
807+
808+
# --- Step 2: a victim connection reuses the fd ---
809+
victim_tr, _ = await self.loop.create_connection(
810+
asyncio.Protocol, *addr)
811+
victim_fd = victim_tr.get_extra_info('socket').fileno()
812+
if victim_fd != stale_fd:
813+
victim_tr.close()
814+
sock.close()
815+
srv.close()
816+
await srv.wait_closed()
817+
raise unittest.SkipTest(
818+
f'fd not reused (got {victim_fd}, need {stale_fd})')
819+
820+
# --- Step 3: stale sock.close() must NOT kill the victim ---
821+
# Allocate the socketpair BEFORE sock.close() so the pair
822+
# fds don't collide with stale_fd.
823+
spy_a, spy_b = socket.socketpair()
824+
spy_b.setblocking(False)
825+
826+
sock.close()
827+
828+
# Check whether sock.close() broke the victim's fd.
829+
victim_broken = False
830+
try:
831+
os.fstat(victim_fd)
832+
except OSError:
833+
victim_broken = True
834+
835+
if victim_broken:
836+
# The victim's fd was killed — place a spy socket on
837+
# the freed fd (in production this would be a new
838+
# incoming connection).
839+
os.dup2(spy_a.fileno(), stale_fd)
840+
spy_a.close()
841+
842+
# Victim writes. If victim_broken, writev(stale_fd) goes
843+
# to the spy; otherwise it goes to the real connection.
844+
victim_tr.write(b'LEAKED')
845+
846+
try:
847+
leaked = spy_b.recv(4096)
848+
except BlockingIOError:
849+
leaked = b''
850+
851+
if victim_broken:
852+
os.close(stale_fd)
853+
spy_b.close()
854+
victim_tr.close()
855+
srv.close()
856+
await srv.wait_closed()
857+
858+
self.assertEqual(leaked, b'',
859+
f"Data leaked to an unrelated socket: "
860+
f"got {leaked!r}")
861+
862+
self.loop.run_until_complete(test())
863+
740864

741865
class Test_UV_TCP(_TestTCP, tb.UVTestCase):
742866

tests/test_unix.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,117 @@ def test_create_unix_connection_6(self):
404404
lambda: None, path='/tmp/a',
405405
ssl_handshake_timeout=SSL_HANDSHAKE_TIMEOUT))
406406

407+
def test_create_unix_connection_sock_cancel_detaches(self):
408+
async def test():
409+
srv_path = os.path.join(tempfile.mkdtemp(), 'test.sock')
410+
srv = await asyncio.start_unix_server(
411+
lambda r, w: w.close(), path=srv_path)
412+
413+
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
414+
sock.setblocking(False)
415+
try:
416+
sock.connect(srv_path)
417+
except BlockingIOError:
418+
pass
419+
await asyncio.sleep(0.01)
420+
421+
task = asyncio.ensure_future(
422+
self.loop.create_unix_connection(
423+
asyncio.Protocol, sock=sock))
424+
await asyncio.sleep(0)
425+
task.cancel()
426+
with self.assertRaises(asyncio.CancelledError):
427+
await task
428+
429+
self.assertEqual(sock.fileno(), -1)
430+
431+
srv.close()
432+
await srv.wait_closed()
433+
if os.path.exists(srv_path):
434+
os.unlink(srv_path)
435+
436+
self.loop.run_until_complete(test())
437+
438+
def test_create_unix_connection_sock_cancel_fd_leak(self):
439+
# Same as test_create_connection_sock_cancel_fd_leak but for
440+
# the create_unix_connection(sock=) path.
441+
442+
async def test():
443+
srv_path = os.path.join(tempfile.mkdtemp(), 'test.sock')
444+
srv = await asyncio.start_unix_server(
445+
lambda r, w: w.close(), path=srv_path)
446+
447+
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
448+
sock.setblocking(False)
449+
await self.loop.sock_connect(sock, srv_path)
450+
stale_fd = sock.fileno()
451+
452+
task = self.loop.create_task(
453+
self.loop.create_unix_connection(
454+
asyncio.Protocol, sock=sock))
455+
await asyncio.sleep(0)
456+
task.cancel()
457+
with self.assertRaises(asyncio.CancelledError):
458+
await task
459+
460+
# Create victim that reuses the fd.
461+
victim_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
462+
victim_sock.setblocking(False)
463+
await self.loop.sock_connect(victim_sock, srv_path)
464+
victim_tr, _ = await self.loop.create_unix_connection(
465+
asyncio.Protocol, sock=victim_sock)
466+
victim_fd = victim_tr.get_extra_info('socket').fileno()
467+
if victim_fd != stale_fd:
468+
victim_tr.close()
469+
sock.close()
470+
srv.close()
471+
await srv.wait_closed()
472+
if os.path.exists(srv_path):
473+
os.unlink(srv_path)
474+
raise unittest.SkipTest(
475+
f'fd not reused (got {victim_fd}, need {stale_fd})')
476+
477+
spy_a, spy_b = socket.socketpair()
478+
spy_b.setblocking(False)
479+
480+
sock.close()
481+
482+
victim_broken = False
483+
try:
484+
os.fstat(victim_fd)
485+
except OSError:
486+
victim_broken = True
487+
488+
if victim_broken:
489+
os.dup2(spy_a.fileno(), stale_fd)
490+
spy_a.close()
491+
492+
victim_tr.write(b'LEAKED')
493+
494+
try:
495+
leaked = spy_b.recv(4096)
496+
except BlockingIOError:
497+
leaked = b''
498+
499+
if victim_broken:
500+
os.close(stale_fd)
501+
spy_b.close()
502+
victim_tr.close()
503+
# Let pending callbacks (e.g. server-side connection_lost
504+
# from the cancelled connection) run before closing the
505+
# server, to avoid triggering call_exception_handler().
506+
await asyncio.sleep(0)
507+
srv.close()
508+
await srv.wait_closed()
509+
if os.path.exists(srv_path):
510+
os.unlink(srv_path)
511+
512+
self.assertEqual(leaked, b'',
513+
f"Data leaked to an unrelated socket: "
514+
f"got {leaked!r}")
515+
516+
self.loop.run_until_complete(test())
517+
407518

408519
class Test_UV_Unix(_TestUnix, tb.UVTestCase):
409520

uvloop/loop.pyx

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2053,6 +2053,9 @@ cdef class Loop:
20532053
tr = TCPTransport.new(self, protocol, None, waiter, context)
20542054
try:
20552055
# libuv will make socket non-blocking
2056+
# We are not detaching the PSO from the now-libuv-managed
2057+
# FD here because of:
2058+
# https://github.com/python/asyncio/pull/449
20562059
tr._open(sock.fileno())
20572060
tr._init_protocol()
20582061
await waiter
@@ -2065,6 +2068,15 @@ cdef class Loop:
20652068
# up in `Transport._call_connection_made()`, and calling
20662069
# `_close()` before it is fine.
20672070
tr._close()
2071+
# Fix for:
2072+
# * https://github.com/MagicStack/uvloop/issues/645
2073+
# * https://github.com/MagicStack/uvloop/issues/738
2074+
# The underlying FD is closed in tr._close(), the owner of
2075+
# `sock` must not get a chance to double-close the same FD
2076+
# sometime later, because that FD may be reused by a new
2077+
# connection under load. So we detach the PSO from the
2078+
# already-closed FD here.
2079+
sock.detach()
20682080
raise
20692081

20702082
tr._attach_fileobj(sock)
@@ -2306,7 +2318,9 @@ cdef class Loop:
23062318
except (KeyboardInterrupt, SystemExit):
23072319
raise
23082320
except BaseException:
2321+
# See comments in create_connection() for more information
23092322
tr._close()
2323+
sock.detach()
23102324
raise
23112325

23122326
tr._attach_fileobj(sock)

0 commit comments

Comments
 (0)