Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions lib/shoryuken/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,17 @@ def assign(queue_name, sqs_msg)
processor_done(queue_name)
end
end
rescue Concurrent::RejectedExecutionError
# The executor was shut down (or a bounded custom launcher_executor is
# saturated) between the running? check above and the post. The promise
# body - and therefore processor_done - never ran, so roll back the
# increment here. Leaking it would permanently shrink `ready`
# (@max_processors - busy) until dispatch stalls and the group stops
# processing. The message was never processed, so we must not run the
# FIFO message_processed callback - decrement directly instead.
@busy_processors.decrement
fire_utilization_update_event
nil
end

# Dispatches a batch of messages from a queue
Expand Down
24 changes: 24 additions & 0 deletions spec/lib/shoryuken/manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,30 @@
expect(polling_strategy).to have_received(:message_processed).once
expect(subject.send(:busy)).to eq(0)
end

context 'when the executor rejects the worker post' do
# A real ExecutorService instance (Concurrent::Promise validates the
# executor type), with running? stubbed true so #assign proceeds past its
# guard and post rejecting - exactly the shutdown race / saturated
# bounded launcher_executor case. ImmediateExecutor spawns no background
# thread, so nothing lingers at teardown.
let(:executor) do
Concurrent::ImmediateExecutor.new.tap do |rejecting_executor|
allow(rejecting_executor).to receive(:running?).and_return(true)
allow(rejecting_executor).to receive(:post).and_raise(Concurrent::RejectedExecutionError)
end
end

it 'rolls back the busy counter and does not leak it' do
# The promise body never runs, so completion must not run either
# (decrement directly, and no FIFO message_processed callback).
expect(subject).not_to receive(:processor_done)

expect { subject.send(:assign, queue, sqs_msg) }.not_to raise_error

expect(subject.send(:busy)).to eq(0)
end
end
end

describe '#processor_done' do
Expand Down