From 3ae02e0f0a0e53ec77e0bd032797da79b78505b8 Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Sun, 14 Jun 2026 12:53:33 +0200 Subject: [PATCH] Synchronize the Shoryuken::Client queue cache Client.queues cached queues with an unsynchronized `@@queues[name] ||= Shoryuken::Queue.new(...)`. Building a queue makes SQS API calls, and that I/O releases the GVL, so concurrent first-access - the dispatch thread, processor-completion threads and worker threads all call Client.queues - built the same queue multiple times: redundant get_queue_url/get_queue_attributes calls, and a corrupt cache hash on JRuby/TruffleRuby. Access to the cache (and the reset in sqs=) is now guarded by a class-level mutex. The lock is held during construction, but queues are built once at first access (and pre-cached at startup by validate_queues), so steady-state lookups are fast hash reads. Coverage: - unit spec that 10 concurrent first-access callers build the queue only once (got 10 before the fix, since Queue construction sleeps/IO releases the GVL) --- CHANGELOG.md | 7 +++++++ lib/shoryuken/client.rb | 10 ++++++++-- spec/lib/shoryuken/client_spec.rb | 18 ++++++++++++++++++ 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 729d8b86..b959a5c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ ## [Unreleased] +- Fix: `Shoryuken::Client.queues` no longer builds the same queue more than once under concurrency (mensfeld) + - The cache used an unsynchronized `@@queues[name] ||= Shoryuken::Queue.new(...)`. Building a queue makes + SQS API calls, and that I/O releases the GVL, so concurrent first-access (dispatch, processor-completion + and worker threads all call it) built the queue multiple times - redundant API calls, and a corrupt cache + on JRuby/TruffleRuby + - Access to the cache is now guarded by a mutex + - Fix: Repeated graceful stop no longer deadlocks the process (mensfeld) - `Manager#await_dispatching_in_progress` popped a signal queue that received exactly one token, so a second `Launcher#stop` blocked forever on an empty queue diff --git a/lib/shoryuken/client.rb b/lib/shoryuken/client.rb index 0a4bc6a3..663fa62a 100644 --- a/lib/shoryuken/client.rb +++ b/lib/shoryuken/client.rb @@ -7,13 +7,19 @@ class Client # @return [Hash{String => Shoryuken::Queue}] cached queue instances by name @@queues = {} + # Guards the queue cache. queues is called concurrently from the dispatch + # thread, processor-completion threads and worker threads, and building a + # Shoryuken::Queue makes SQS API calls, so an unsynchronized `||=` would let + # several callers build the same queue (and corrupt the hash on JRuby). + @@queues_mutex = Mutex.new + class << self # Returns a Queue instance for the given queue name # # @param name [String, Symbol] the name of the queue # @return [Shoryuken::Queue] the queue instance def queues(name) - @@queues[name.to_s] ||= Shoryuken::Queue.new(sqs, name) + @@queues_mutex.synchronize { @@queues[name.to_s] ||= Shoryuken::Queue.new(sqs, name) } end # Returns the current SQS client @@ -30,7 +36,7 @@ def sqs def sqs=(sqs) # Since the @@queues values (Shoryuken::Queue objects) are built referencing @@sqs, if it changes, we need to # re-build them on subsequent calls to `.queues(name)`. - @@queues = {} + @@queues_mutex.synchronize { @@queues = {} } Shoryuken.sqs_client = sqs end diff --git a/spec/lib/shoryuken/client_spec.rb b/spec/lib/shoryuken/client_spec.rb index 52b2de57..4909ec9e 100644 --- a/spec/lib/shoryuken/client_spec.rb +++ b/spec/lib/shoryuken/client_spec.rb @@ -17,5 +17,23 @@ expect(Shoryuken::Client.queues(queue_name).url).to eq queue_url expect(Shoryuken::Client.queues(queue_name).url).to eq queue_url end + + it 'constructs each queue only once under concurrent first access' do + allow(described_class).to receive(:sqs).and_return(sqs) + + construction_count = Shoryuken::Helpers::AtomicCounter.new(0) + allow(Shoryuken::Queue).to receive(:new) do + construction_count.increment + # Mimic the SQS API latency during construction. sleep releases the GVL, + # so without synchronization every concurrent caller builds its own queue. + sleep 0.05 + instance_double(Shoryuken::Queue) + end + + threads = Array.new(10) { Thread.new { described_class.queues('concurrent') } } + threads.each(&:join) + + expect(construction_count.value).to eq(1) + end end end