diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f8a3f27..8b613a7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ ## [Unreleased] +- Fix: `Shoryuken::Client.queues` no longer builds the same queue more than once under concurrency (mensfeld) + - The cache used an unsynchronized `@@queues[name] ||= Shoryuken::Queue.new(...)`. Building a queue makes + SQS API calls, and that I/O releases the GVL, so concurrent first-access (dispatch, processor-completion + and worker threads all call it) built the queue multiple times - redundant API calls, and a corrupt cache + on JRuby/TruffleRuby + - Access to the cache is now guarded by a mutex + - Fix: Polling strategies are now thread-safe, and WeightedRoundRobin unpauses processed queues reliably (mensfeld) - `message_processed` runs on processor-completion threads (for FIFO queues) while `next_queue`/`messages_found` run on the dispatch thread; they mutate the same state with no synchronization, which is benign on MRI (GVL) diff --git a/lib/shoryuken/client.rb b/lib/shoryuken/client.rb index 0a4bc6a3..663fa62a 100644 --- a/lib/shoryuken/client.rb +++ b/lib/shoryuken/client.rb @@ -7,13 +7,19 @@ class Client # @return [Hash{String => Shoryuken::Queue}] cached queue instances by name @@queues = {} + # Guards the queue cache. queues is called concurrently from the dispatch + # thread, processor-completion threads and worker threads, and building a + # Shoryuken::Queue makes SQS API calls, so an unsynchronized `||=` would let + # several callers build the same queue (and corrupt the hash on JRuby). + @@queues_mutex = Mutex.new + class << self # Returns a Queue instance for the given queue name # # @param name [String, Symbol] the name of the queue # @return [Shoryuken::Queue] the queue instance def queues(name) - @@queues[name.to_s] ||= Shoryuken::Queue.new(sqs, name) + @@queues_mutex.synchronize { @@queues[name.to_s] ||= Shoryuken::Queue.new(sqs, name) } end # Returns the current SQS client @@ -30,7 +36,7 @@ def sqs def sqs=(sqs) # Since the @@queues values (Shoryuken::Queue objects) are built referencing @@sqs, if it changes, we need to # re-build them on subsequent calls to `.queues(name)`. - @@queues = {} + @@queues_mutex.synchronize { @@queues = {} } Shoryuken.sqs_client = sqs end diff --git a/spec/lib/shoryuken/client_spec.rb b/spec/lib/shoryuken/client_spec.rb index 52b2de57..4909ec9e 100644 --- a/spec/lib/shoryuken/client_spec.rb +++ b/spec/lib/shoryuken/client_spec.rb @@ -17,5 +17,23 @@ expect(Shoryuken::Client.queues(queue_name).url).to eq queue_url expect(Shoryuken::Client.queues(queue_name).url).to eq queue_url end + + it 'constructs each queue only once under concurrent first access' do + allow(described_class).to receive(:sqs).and_return(sqs) + + construction_count = Shoryuken::Helpers::AtomicCounter.new(0) + allow(Shoryuken::Queue).to receive(:new) do + construction_count.increment + # Mimic the SQS API latency during construction. sleep releases the GVL, + # so without synchronization every concurrent caller builds its own queue. + sleep 0.05 + instance_double(Shoryuken::Queue) + end + + threads = Array.new(10) { Thread.new { described_class.queues('concurrent') } } + threads.each(&:join) + + expect(construction_count.value).to eq(1) + end end end