Skip to content

Commit be54958

Browse files
committed
Ensure we don't hang in Socket#accept due to spurious readiness.
It's possible, especially on dual stack, to have issues where a server may become readable, but by the time accept is called, the connection is gone. This can cause a deadlock between the semaphore and the accept call, which can hang indefinitely.
1 parent a6b9bd9 commit be54958

File tree

2 files changed

+55
-14
lines changed

2 files changed

+55
-14
lines changed

examples/limited/config.ru

+2-4
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ run do |env|
2020
# There is no guarantee that there is a connection or that the connection has a token:
2121
token = limited_semaphore_token(request)
2222

23-
Console.info(self, "Sleeping 10 seconds", token: token)
24-
2523
if env["PATH_INFO"] == "/fast"
2624
if token
2725
# Keeping the connection alive here is problematic because if the next request is slow, it will "block the server" since we have relinquished the token already.
@@ -30,10 +28,10 @@ run do |env|
3028
end
3129

3230
# Simulated "fast / non-blocking" request:
33-
sleep(1)
31+
sleep(0.01)
3432
else
3533
# Simulated "slow / blocking" request:
36-
sleep(10)
34+
sleep(0.1)
3735
end
3836

3937
[200, {}, ["Hello World"]]

examples/limited/limited.rb

+53-10
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# frozen_string_literal: true
2+
13
module Limited
24
# Thread local storage for the semaphore (per-worker):
35
Thread.attr_accessor :limited_semaphore
@@ -12,21 +14,32 @@ def self.instance
1214
# Create a new semaphore with the given limit.
1315
def initialize(limit = 1)
1416
@queue = Thread::Queue.new
17+
18+
Console.debug(self, "Initializing queue...", limit: limit)
1519
limit.times{release}
1620
end
1721

1822
# Release the semaphore.
1923
def release
24+
Console.debug(self, "Releasing semaphore...")
2025
@queue.push(true)
2126
end
2227

2328
# Acquire the semaphore. May block until the semaphore is available.
2429
def acquire
30+
Console.debug(self, "Acquiring semaphore...")
2531
@queue.pop
26-
32+
Console.debug(self, "Acquired semaphore...")
33+
2734
return Token.new(self)
2835
end
2936

37+
def try_acquire
38+
if @queue.pop(timeout: 0)
39+
return Token.new(self)
40+
end
41+
end
42+
3043
# A token that can be used to release the semaphore once and once only.
3144
class Token
3245
def initialize(semaphore)
@@ -44,19 +57,46 @@ def release
4457

4558
# A wrapper implementation for the endpoint that limits the number of connections that can be accepted.
4659
class Wrapper < IO::Endpoint::Wrapper
47-
def socket_accept(server)
60+
# Wait for an inbound connection to be ready to be accepted.
61+
def wait_for_inbound_connection(server)
4862
semaphore = Semaphore.instance
4963

5064
# Wait until there is a connection ready to be accepted:
51-
server.wait_readable
65+
while true
66+
server.wait_readable
5267

53-
# Acquire the semaphore:
54-
Console.info(self, "Acquiring semaphore...")
55-
token = semaphore.acquire
68+
# Acquire the semaphore:
69+
if token = semaphore.acquire
70+
return token
71+
end
72+
end
73+
end
74+
75+
# Once the server is readable and we've acquired the token, we can accept the connection (if it's still there).
76+
def socket_accept_nonblock(server, token)
77+
result = server.accept_nonblock
5678

57-
# Accept the connection:
58-
socket, address = super
59-
Console.info(self, "Accepted connection from #{address.inspect}", socket: socket)
79+
success = true
80+
return result
81+
rescue IO::WaitReadable
82+
return nil
83+
ensure
84+
token.release unless success
85+
end
86+
87+
# Accept a connection from the server, limited by the per-worker (thread or process) semaphore.
88+
def socket_accept(server)
89+
while true
90+
if token = wait_for_inbound_connection(server)
91+
# In principle, there is a connection ready to be accepted:
92+
socket, address = socket_accept_nonblock(server, token)
93+
94+
if socket
95+
Console.debug(self, "Accepted connection from #{address.inspect}", socket: socket)
96+
break
97+
end
98+
end
99+
end
60100

61101
# Provide access to the token, so that the connection limit could be released prematurely if it is determined that the request will not overload the server:
62102
socket.define_singleton_method :token do
@@ -67,11 +107,14 @@ def socket_accept(server)
67107
socket.define_singleton_method :close do
68108
super()
69109
ensure
70-
Console.info(self, "Closing connection from #{address.inspect}", socket: socket)
110+
Console.debug(self, "Releasing connection from #{address.inspect}", socket: socket)
71111
token.release
72112
end
73113

114+
success = true
74115
return socket, address
116+
ensure
117+
token&.release unless success
75118
end
76119
end
77120
end

0 commit comments

Comments
 (0)