Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions lib/shoryuken/active_job/current_attributes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,10 @@ module Loading
# @param hash [Hash] the deserialized job data
# @return [void]
def perform(sqs_msg, hash)
klasses_to_reset = []

CurrentAttributes.cattrs&.each do |key, klass_name|
next unless hash.key?(key)

klass = klass_name.constantize
klasses_to_reset << klass

begin
attrs = Serializer.deserialize(hash[key])
Expand All @@ -135,7 +132,16 @@ def perform(sqs_msg, hash)

super
ensure
klasses_to_reset.each(&:reset)
# Reset every registered CurrentAttributes class, not only the ones
# present in this message. A message without a cattr key (enqueued
# before persist was configured, by a different producer, or with an
# empty context) would otherwise leave a previous job's values set on a
# reused worker thread, leaking context into the next job.
CurrentAttributes.cattrs&.each_value do |klass_name|
klass_name.constantize.reset
rescue => e
Shoryuken.logger.warn("Failed to reset CurrentAttributes #{klass_name}: #{e.message}")
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# frozen_string_literal: true

# A message without a CurrentAttributes key must not inherit a previous job's
# context. Every registered class is reset after each job - even keyless ones -
# so values set during one job don't leak into the next on a reused worker
# thread.

setup_sqs
setup_active_job

require 'active_support/current_attributes'
require 'shoryuken/active_job/current_attributes'

queue_name = DT.queue
create_test_queue(queue_name)

class LeakCurrent < ActiveSupport::CurrentAttributes
attribute :user_id
end

Shoryuken::ActiveJob::CurrentAttributes.persist(LeakCurrent)

class CrossJobResetTestJob < ActiveJob::Base
def perform
# Record what the worker thread sees at the start of the job, then dirty
# Current. Without a reset after a keyless job, the second execution on the
# same thread would observe 'leaked' instead of nil.
DT[:observed] << LeakCurrent.user_id
LeakCurrent.user_id = 'leaked'
end
end

CrossJobResetTestJob.queue_as(queue_name)

# Concurrency 1 so the two jobs run sequentially on the same worker thread.
Shoryuken.add_group('default', 1)
Shoryuken.add_queue(queue_name, 1, 'default')
Shoryuken.register_worker(queue_name, Shoryuken::ActiveJob::JobWrapper)

# Enqueued with an empty context, so both messages carry no cattr key.
CrossJobResetTestJob.perform_later
CrossJobResetTestJob.perform_later

poll_queues_until(timeout: 30) { DT[:observed].size >= 2 }

assert_equal([nil, nil], DT[:observed].first(2),
"CurrentAttributes leaked across jobs: #{DT[:observed].inspect}")