Skip to content

Commit 9739e9a

Browse files
committed
Asynchronous pruning for RubyThreadPoolExecutor
1 parent 2aa6f64 commit 9739e9a

8 files changed

+152
-93
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
module Concurrent
2+
module Collection
3+
# @!visibility private
4+
# @!macro ruby_timeout_queue
5+
class RubyTimeoutQueue < ::Queue
6+
def initialize(*args)
7+
if RUBY_VERSION >= '3.2'
8+
raise "#{self.class.name} is not needed on Ruby 3.2 or later, use ::Queue instead"
9+
end
10+
11+
super(*args)
12+
13+
@mutex = Mutex.new
14+
@cond_var = ConditionVariable.new
15+
end
16+
17+
def push(obj)
18+
@mutex.synchronize do
19+
super(obj)
20+
@cond_var.signal
21+
end
22+
end
23+
alias_method :enq, :push
24+
alias_method :<<, :push
25+
26+
def pop(non_block = false, timeout: nil)
27+
if non_block && timeout
28+
raise ArgumentError, "can't set a timeout if non_block is enabled"
29+
end
30+
31+
if non_block
32+
super(true)
33+
elsif timeout
34+
@mutex.synchronize do
35+
deadline = Concurrent.monotonic_time + timeout
36+
while (now = Concurrent.monotonic_time) < deadline # handle spurious wakeups
37+
begin
38+
return super(true)
39+
rescue ThreadError
40+
# empty, wait until woken up
41+
end
42+
43+
@cond_var.wait(@mutex, deadline - now)
44+
end
45+
46+
nil
47+
end
48+
else
49+
super(false)
50+
end
51+
end
52+
alias_method :deq, :pop
53+
alias_method :shift, :pop
54+
end
55+
private_constant :RubyTimeoutQueue
56+
end
57+
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
module Concurrent
2+
module Collection
3+
# @!visibility private
4+
# @!macro internal_implementation_note
5+
TimeoutQueueImplementation = if RUBY_VERSION >= '3.2'
6+
::Queue
7+
else
8+
require 'concurrent/collection/ruby_timeout_queue'
9+
RubyTimeoutQueue
10+
end
11+
private_constant :TimeoutQueueImplementation
12+
13+
# @!visibility private
14+
# @!macro timeout_queue
15+
class TimeoutQueue < TimeoutQueueImplementation
16+
end
17+
end
18+
end

lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb

+2-4
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,8 @@ module Concurrent
8181
# What is being pruned is controlled by the min_threads and idletime
8282
# parameters passed at pool creation time
8383
#
84-
# This is a no-op on some pool implementation (e.g. the Java one). The Ruby
85-
# pool will auto-prune each time a new job is posted. You will need to call
86-
# this method explicitly in case your application post jobs in bursts (a
87-
# lot of jobs and then nothing for long periods)
84+
# This is a no-op on all pool implementations as they prune themselves
85+
# automatically, and has been deprecated.
8886

8987
# @!macro thread_pool_executor_public_api
9088
#

lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb

+2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ module Concurrent
88
# @!macro thread_pool_options
99
# @!visibility private
1010
class JavaThreadPoolExecutor < JavaExecutorService
11+
include Concern::Deprecation
1112

1213
# @!macro thread_pool_executor_constant_default_max_pool_size
1314
DEFAULT_MAX_POOL_SIZE = java.lang.Integer::MAX_VALUE # 2147483647
@@ -100,6 +101,7 @@ def running?
100101

101102
# @!macro thread_pool_executor_method_prune_pool
102103
def prune_pool
104+
deprecated "#prune_pool has no effect and will be removed in the next release."
103105
end
104106

105107
private

lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb

+42-32
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
require 'concurrent/concern/logging'
44
require 'concurrent/executor/ruby_executor_service'
55
require 'concurrent/utility/monotonic_time'
6+
require 'concurrent/collection/timeout_queue'
67

78
module Concurrent
89

910
# @!macro thread_pool_executor
1011
# @!macro thread_pool_options
1112
# @!visibility private
1213
class RubyThreadPoolExecutor < RubyExecutorService
14+
include Concern::Deprecation
1315

1416
# @!macro thread_pool_executor_constant_default_max_pool_size
1517
DEFAULT_MAX_POOL_SIZE = 2_147_483_647 # java.lang.Integer::MAX_VALUE
@@ -95,8 +97,16 @@ def remaining_capacity
9597
end
9698

9799
# @!visibility private
98-
def remove_busy_worker(worker)
99-
synchronize { ns_remove_busy_worker worker }
100+
def prunable_capacity
101+
synchronize { ns_prunable_capacity }
102+
end
103+
104+
# @!visibility private
105+
def remove_worker(worker)
106+
synchronize do
107+
ns_remove_ready_worker worker
108+
ns_remove_busy_worker worker
109+
end
100110
end
101111

102112
# @!visibility private
@@ -116,7 +126,7 @@ def worker_task_completed
116126

117127
# @!macro thread_pool_executor_method_prune_pool
118128
def prune_pool
119-
synchronize { ns_prune_pool }
129+
deprecated "#prune_pool has no effect and will be removed in next the release, see https://github.com/ruby-concurrency/concurrent-ruby/pull/1082."
120130
end
121131

122132
private
@@ -146,9 +156,6 @@ def ns_initialize(opts)
146156
@largest_length = 0
147157
@workers_counter = 0
148158
@ruby_pid = $$ # detects if Ruby has forked
149-
150-
@gc_interval = opts.fetch(:gc_interval, @idletime / 2.0).to_i # undocumented
151-
@next_gc_time = Concurrent.monotonic_time + @gc_interval
152159
end
153160

154161
# @!visibility private
@@ -162,12 +169,10 @@ def ns_execute(*args, &task)
162169

163170
if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task)
164171
@scheduled_task_count += 1
172+
nil
165173
else
166-
return fallback_action(*args, &task)
174+
fallback_action(*args, &task)
167175
end
168-
169-
ns_prune_pool if @next_gc_time < Concurrent.monotonic_time
170-
nil
171176
end
172177

173178
# @!visibility private
@@ -218,7 +223,7 @@ def ns_assign_worker(*args, &task)
218223
# @!visibility private
219224
def ns_enqueue(*args, &task)
220225
return false if @synchronous
221-
226+
222227
if !ns_limited_queue? || @queue.size < @max_queue
223228
@queue << [task, args]
224229
true
@@ -265,7 +270,7 @@ def ns_ready_worker(worker, last_message, success = true)
265270
end
266271
end
267272

268-
# removes a worker which is not in not tracked in @ready
273+
# removes a worker which is not tracked in @ready
269274
#
270275
# @!visibility private
271276
def ns_remove_busy_worker(worker)
@@ -274,23 +279,19 @@ def ns_remove_busy_worker(worker)
274279
true
275280
end
276281

277-
# try oldest worker if it is idle for enough time, it's returned back at the start
278-
#
279-
# @!visibility private
280-
def ns_prune_pool
281-
now = Concurrent.monotonic_time
282-
stopped_workers = 0
283-
while !@ready.empty? && (@pool.size - stopped_workers > @min_length)
284-
worker, last_message = @ready.first
285-
if now - last_message > self.idletime
286-
stopped_workers += 1
287-
@ready.shift
288-
worker << :stop
289-
else break
290-
end
282+
def ns_remove_ready_worker(worker)
283+
if index = @ready.index { |rw, _| rw == worker }
284+
@ready.delete_at(index)
291285
end
286+
true
287+
end
292288

293-
@next_gc_time = Concurrent.monotonic_time + @gc_interval
289+
def ns_prunable_capacity
290+
if running?
291+
[@pool.size - @min_length, @ready.size].min
292+
else
293+
@pool.size
294+
end
294295
end
295296

296297
def ns_reset_if_forked
@@ -312,7 +313,7 @@ class Worker
312313

313314
def initialize(pool, id)
314315
# instance variables accessed only under pool's lock so no need to sync here again
315-
@queue = Queue.new
316+
@queue = Collection::TimeoutQueue.new
316317
@pool = pool
317318
@thread = create_worker @queue, pool, pool.idletime
318319

@@ -338,17 +339,26 @@ def kill
338339
def create_worker(queue, pool, idletime)
339340
Thread.new(queue, pool, idletime) do |my_queue, my_pool, my_idletime|
340341
catch(:stop) do
341-
loop do
342+
prunable = true
342343

343-
case message = my_queue.pop
344+
loop do
345+
timeout = prunable && my_pool.running? ? my_idletime : nil
346+
case message = my_queue.pop(timeout: timeout)
347+
when nil
348+
if my_pool.prunable_capacity.positive?
349+
my_pool.remove_worker(self)
350+
throw :stop
351+
end
352+
353+
prunable = false
344354
when :stop
345-
my_pool.remove_busy_worker(self)
355+
my_pool.remove_worker(self)
346356
throw :stop
347-
348357
else
349358
task, args = message
350359
run_task my_pool, task, args
351360
my_pool.ready_worker(self, Concurrent.monotonic_time)
361+
prunable = true
352362
end
353363
end
354364
end

spec/concurrent/executor/cached_thread_pool_spec.rb

+15-15
Original file line numberDiff line numberDiff line change
@@ -152,15 +152,13 @@ module Concurrent
152152

153153
context 'garbage collection' do
154154

155-
subject { described_class.new(idletime: 0.1, max_threads: 2, gc_interval: 0) }
155+
subject { described_class.new(idletime: 0.1, max_threads: 2) }
156156

157157
it 'removes from pool any thread that has been idle too long' do
158158
latch = Concurrent::CountDownLatch.new(4)
159159
4.times { subject.post { sleep 0.1; latch.count_down } }
160+
sleep 0.4
160161
expect(latch.wait(1)).to be true
161-
sleep 0.2
162-
subject.post {}
163-
sleep 0.2
164162
expect(subject.length).to be < 4
165163
end
166164

@@ -197,25 +195,27 @@ module Concurrent
197195
expect(subject.length).to be >= 5
198196
3.times { subject << proc { sleep(1) } }
199197
sleep(0.1)
200-
expect(subject.length).to be >= 5
198+
expect(subject.length).to be >= 3
201199
end
202200
end
203201
end
204202

205203
context 'stress' do
206204
configurations = [
207-
{ min_threads: 2,
208-
max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE,
209-
idletime: 0.1, # 1 minute
210-
max_queue: 0, # unlimited
205+
{
206+
min_threads: 2,
207+
max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE,
208+
idletime: 60, # 1 minute
209+
max_queue: 0, # unlimited
211210
fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue
212-
gc_interval: 0.1 },
213-
{ min_threads: 2,
214-
max_threads: 4,
215-
idletime: 0.1, # 1 minute
216-
max_queue: 0, # unlimited
211+
},
212+
{
213+
min_threads: 2,
214+
max_threads: 4,
215+
idletime: 60, # 1 minute
216+
max_queue: 0, # unlimited
217217
fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue
218-
gc_interval: 0.1 }
218+
}
219219
]
220220

221221
configurations.each do |config|

spec/concurrent/executor/java_thread_pool_executor_spec.rb

-7
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,6 @@ module Concurrent
2626

2727
it_should_behave_like :thread_pool_executor
2828

29-
context :prune do
30-
it "is a no-op, pruning is handled by the JVM" do
31-
executor = JavaThreadPoolExecutor.new
32-
executor.prune_pool
33-
end
34-
end
35-
3629
context '#overload_policy' do
3730

3831
specify ':abort maps to AbortPolicy' do

0 commit comments

Comments
 (0)