Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
## [Unreleased]

- Fix: `Shoryuken::Client.queues` no longer builds the same queue more than once under concurrency (mensfeld)
- The cache used an unsynchronized `@@queues[name] ||= Shoryuken::Queue.new(...)`. Building a queue makes
SQS API calls, and that I/O releases the GVL, so concurrent first-access (dispatch, processor-completion
and worker threads all call it) built the queue multiple times - redundant API calls, and a corrupt cache
on JRuby/TruffleRuby
- Access to the cache is now guarded by a mutex

- Fix: Polling strategies are now thread-safe, and WeightedRoundRobin unpauses processed queues reliably (mensfeld)
- `message_processed` runs on processor-completion threads (for FIFO queues) while `next_queue`/`messages_found`
run on the dispatch thread; they mutate the same state with no synchronization, which is benign on MRI (GVL)
Expand Down
10 changes: 8 additions & 2 deletions lib/shoryuken/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,19 @@ class Client
# @return [Hash{String => Shoryuken::Queue}] cached queue instances by name
@@queues = {}

# Guards the queue cache. queues is called concurrently from the dispatch
# thread, processor-completion threads and worker threads, and building a
# Shoryuken::Queue makes SQS API calls, so an unsynchronized `||=` would let
# several callers build the same queue (and corrupt the hash on JRuby).
@@queues_mutex = Mutex.new

class << self
# Returns a Queue instance for the given queue name
#
# @param name [String, Symbol] the name of the queue
# @return [Shoryuken::Queue] the queue instance
def queues(name)
@@queues[name.to_s] ||= Shoryuken::Queue.new(sqs, name)
@@queues_mutex.synchronize { @@queues[name.to_s] ||= Shoryuken::Queue.new(sqs, name) }
end

# Returns the current SQS client
Expand All @@ -30,7 +36,7 @@ def sqs
def sqs=(sqs)
# Since the @@queues values (Shoryuken::Queue objects) are built referencing @@sqs, if it changes, we need to
# re-build them on subsequent calls to `.queues(name)`.
@@queues = {}
@@queues_mutex.synchronize { @@queues = {} }

Shoryuken.sqs_client = sqs
end
Expand Down
18 changes: 18 additions & 0 deletions spec/lib/shoryuken/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,23 @@
expect(Shoryuken::Client.queues(queue_name).url).to eq queue_url
expect(Shoryuken::Client.queues(queue_name).url).to eq queue_url
end

it 'constructs each queue only once under concurrent first access' do
allow(described_class).to receive(:sqs).and_return(sqs)

construction_count = Shoryuken::Helpers::AtomicCounter.new(0)
allow(Shoryuken::Queue).to receive(:new) do
construction_count.increment
# Mimic the SQS API latency during construction. sleep releases the GVL,
# so without synchronization every concurrent caller builds its own queue.
sleep 0.05
instance_double(Shoryuken::Queue)
end

threads = Array.new(10) { Thread.new { described_class.queues('concurrent') } }
threads.each(&:join)

expect(construction_count.value).to eq(1)
end
end
end