Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions lib/shoryuken/launcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ def stop!

shutdown_executor

drain_pending_active_job_sends

fire_event(:stopped)
end

Expand All @@ -62,6 +64,8 @@ def stop

shutdown_executor

drain_pending_active_job_sends

fire_event(:stopped)
end

Expand Down Expand Up @@ -103,6 +107,30 @@ def shutdown_executor
executor.kill unless executor.wait_for_termination(Shoryuken.options[:timeout])
end

# Drains in-flight asynchronous ActiveJob sends so jobs enqueued just before
# shutdown (e.g. by a worker during processing) are flushed to SQS instead of
# being dropped when the process exits.
#
# No-op unless ActiveJob is loaded and its configured adapter supports
# draining (i.e. ShoryukenConcurrentSendAdapter). Bounded by the configured
# timeout so it can never block shutdown indefinitely, and any error is
# swallowed so a draining hiccup can't break the shutdown sequence.
#
# @return [void]
def drain_pending_active_job_sends
return unless defined?(::ActiveJob::Base)

adapter = ::ActiveJob::Base.queue_adapter
return unless adapter.respond_to?(:wait_for_pending_sends)

logger.info { 'Draining in-flight ActiveJob sends' }

drained = adapter.wait_for_pending_sends(Shoryuken.options[:timeout])
logger.warn { 'Timed out draining in-flight ActiveJob sends; some may not have been delivered' } unless drained
rescue => e
logger.warn { "Error draining in-flight ActiveJob sends: #{e.class}: #{e.message}" }
end

# Returns the executor for running async operations
#
# Owns a dedicated executor rather than borrowing Concurrent.global_io_executor:
Expand Down
45 changes: 45 additions & 0 deletions spec/lib/shoryuken/launcher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,51 @@
end
end

describe 'draining in-flight ActiveJob sends on shutdown' do
let(:adapter) { double('queue_adapter') }

before do
allow(first_group_manager).to receive(:stop_new_dispatching)
allow(first_group_manager).to receive(:await_dispatching_in_progress)
allow(second_group_manager).to receive(:stop_new_dispatching)
allow(second_group_manager).to receive(:await_dispatching_in_progress)

# Make ActiveJob present and point its adapter at our double, regardless of
# whether active_job happens to be loaded by another spec.
stub_const('ActiveJob::Base', Class.new)
allow(ActiveJob::Base).to receive(:queue_adapter).and_return(adapter)
end

it 'drains pending sends on graceful stop when the adapter supports it' do
expect(adapter).to receive(:wait_for_pending_sends).with(Shoryuken.options[:timeout]).and_return(true)

subject.stop
end

it 'drains pending sends on immediate stop! when the adapter supports it' do
expect(adapter).to receive(:wait_for_pending_sends).with(Shoryuken.options[:timeout]).and_return(true)

subject.stop!
end

it 'is a no-op when the adapter does not support draining' do
# adapter is a plain double, so it does not respond to wait_for_pending_sends
expect { subject.stop }.not_to raise_error
end

it 'does not break shutdown when draining raises' do
allow(adapter).to receive(:wait_for_pending_sends).and_raise('drain boom')

expect { subject.stop }.not_to raise_error
end

it 'completes shutdown even if draining times out' do
allow(adapter).to receive(:wait_for_pending_sends).and_return(false)

expect { subject.stop }.not_to raise_error
end
end

describe 'executor ownership' do
context 'when no launcher_executor is configured' do
before { allow(Shoryuken).to receive(:launcher_executor).and_return(nil) }
Expand Down