diff --git a/app/lib/actions/katello/event_queue/monitor.rb b/app/lib/actions/katello/event_queue/monitor.rb new file mode 100644 index 00000000000..1f17860365e --- /dev/null +++ b/app/lib/actions/katello/event_queue/monitor.rb @@ -0,0 +1,57 @@ +module Actions + module Katello + module EventQueue + class Monitor < Actions::EntryAction + include ::Dynflow::Action::Singleton + include ::Dynflow::Action::Polling + + def self.launch(world) + unless self.singleton_locked?(world) + ForemanTasks.trigger(self) + end + end + + def invoke_external_task + ::Katello::EventQueue.initialize + action_logger.info "Katello Event Queue initialized" + end + + def run(event = nil) + case event + when Skip + # noop + else + super + end + end + + def stop_condition + -> { world.terminating? } + end + + def poll_external_task + poller = ::Katello::EventMonitor::PollerThread.new + User.as_anonymous_admin do + poller.drain_queue(-> { world.terminating? }) + end + end + + def done? + false + end + + def poll_intervals + [2] + end + + def rescue_strategy + ::Dynflow::Action::Rescue::Skip + end + + def humanized_name + _("Katello Event Queue") + end + end + end + end +end diff --git a/app/models/katello/ping.rb b/app/models/katello/ping.rb index a4743cc31a0..49dbede360a 100644 --- a/app/models/katello/ping.rb +++ b/app/models/katello/ping.rb @@ -49,10 +49,15 @@ def event_daemon_status(status, result) end end + def event_queue_foreman_tasks + ForemanTasks::Task.running.where(label: Actions::Katello::EventQueue::Monitor.name) + end + def ping_katello_events(result) exception_watch(result) do - status = Katello::EventDaemon::Runner.service_status(:katello_events) - event_daemon_status(status, result) + if event_queue_foreman_tasks.empty? + result[:status] = FAIL_RETURN_CODE + end end end @@ -238,7 +243,7 @@ def ping_services_for_capsule(services, capsule_id) ping_pulp_with_auth(result[:pulp_auth], result[:pulp][:status], capsule_id) if result.include?(:pulp_auth) ping_candlepin_with_auth(result[:candlepin_auth]) if result.include?(:candlepin_auth) ping_foreman_tasks(result[:foreman_tasks]) if result.include?(:foreman_tasks) - ping_katello_events(result[:katello_events]) if result.include?(:katello_events) + ping_katello_events(result[:katello_events]) if result.include?(:katello_events) # Wait for foreman-tasks result ping_candlepin_events(result[:candlepin_events]) if result.include?(:candlepin_events) # set overall status result code diff --git a/app/services/katello/event_monitor/poller_thread.rb b/app/services/katello/event_monitor/poller_thread.rb index a5355c92683..90db5e5b972 100644 --- a/app/services/katello/event_monitor/poller_thread.rb +++ b/app/services/katello/event_monitor/poller_thread.rb @@ -1,106 +1,31 @@ module Katello module EventMonitor - # TODO: Move this class to app/lib/katello/event_daemon/services with other service definitions class PollerThread - SLEEP_INTERVAL = 3 - - cattr_accessor :instance - - def self.initialize(logger = nil) - self.instance ||= self.new(logger) - end - - def self.close - if self.instance - self.instance.close - self.instance = nil - end - end - - def self.run - initialize - ::Katello::EventQueue.reset_in_progress - instance.poll_for_events - end - - def self.status - instance&.status - end - - def initialize(logger = nil) - @logger = logger || ::Foreman::Logging.logger('katello/katello_events') - @failed_count = 0 - @processed_count = 0 + def initialize(logger = Rails.logger, queue = Katello::EventQueue) + @logger = logger + @queue = queue end - def close - @logger.info("Stopping Katello Event Monitor") - @thread&.kill - end - - def running? - @thread&.status || false - end - - def status - { - processed_count: @processed_count, - failed_count: @failed_count, - running: running?, - } - end - - def run_event(event) - Katello::Logging.time("katello event handled") do |data| - data[:type] = event.event_type - data[:object_id] = event.object_id - data[:expired] = false - data[:rescheduled] = false - - event_instance = nil - begin - ::User.as_anonymous_admin do - event_instance = ::Katello::EventQueue.create_instance(event) - event_instance.run - end - rescue => e - @failed_count += 1 - @logger.error("event_queue_error: type=#{event.event_type}, object_id=#{event.object_id}") - @logger.error(e.message) - @logger.error(e.backtrace.join("\n")) - ensure - if event_instance.try(:retry) - result = ::Katello::EventQueue.reschedule_event(event) - if result == :expired - @logger.warn("event_queue_event_expired: type=#{event.event_type} object_id=#{event.object_id}") - elsif !result.nil? - @logger.warn("event_queue_rescheduled: type=#{event.event_type} object_id=#{event.object_id}") - end - end - ::Katello::EventQueue.clear_events(event.event_type, event.object_id, event.created_at) + def drain_queue(stop_condition) + until stop_condition.call || (event = @queue.next_event).nil? + Katello::Logging.time('katello event handled') do |data| + data[:event_type] = event.event_type + data[:object_id] = event.object_id + run_event(event) end end end - def poll_for_events - @thread = Thread.new do - @logger.info("Polling Katello Event Queue") - loop do - Rails.application.executor.wrap do - Katello::Util::Support.with_db_connection(@logger) do - until (event = ::Katello::EventQueue.next_event).nil? - run_event(event) - @processed_count += 1 - end - end - end - - sleep SLEEP_INTERVAL + def run_event(event) + @queue.mark_in_progress(event) + begin + event_instance = @queue.create_instance(event) + event_instance.run + ensure + if event_instance.try(:retry) + @queue.reschedule_event(event) end - rescue => e - @logger.error(e.message) - @logger.error("Fatal error in Katello Event Monitor") - self.class.close + @queue.clear_events(event.event_type, event.object_id) end end end diff --git a/app/services/katello/event_queue.rb b/app/services/katello/event_queue.rb index 0adf2cdb48f..5bc082029d7 100644 --- a/app/services/katello/event_queue.rb +++ b/app/services/katello/event_queue.rb @@ -5,6 +5,8 @@ class EventQueue @event_types = {} def self.create_instance(event) + return unless supported_event_types.include?(event.event_type) + event_class = ::Katello::EventQueue.event_class(event.event_type) event_class.new(event.object_id) do |instance| @@ -12,6 +14,15 @@ def self.create_instance(event) end end + def self.initialize + delete_unsupported_events + reset_in_progress + end + + def self.delete_unsupported_events + Katello::Event.where.not(event_type: supported_event_types).delete_all + end + def self.queue_depth ::Katello::Event.all.size end @@ -20,17 +31,12 @@ def self.runnable_events Katello::Event.where(process_after: nil).or(Katello::Event.where(process_after: Date.new..Time.zone.now)) end - def self.clear_events(event_type, object_id, on_or_earlier_than) - Katello::Event.where(:in_progress => true, :object_id => object_id, :event_type => event_type).where('created_at <= ?', on_or_earlier_than).delete_all + def self.clear_events(event_type, object_id) + Katello::Event.where(:in_progress => true, :object_id => object_id, :event_type => event_type).delete_all end def self.next_event - first = runnable_events.where(:in_progress => false).order(:created_at => 'asc').first - return if first.nil? - last = runnable_events.where(:in_progress => false, :object_id => first.object_id, - :event_type => first.event_type).order(:created_at => 'desc').first - mark_in_progress(first) - last + runnable_events.where(in_progress: false).order(created_at: :asc).first end def self.mark_in_progress(event) @@ -79,7 +85,6 @@ def self.supported_event_types end def self.event_class(event_type) - fail _("Invalid event_type %s") % event_type if @event_types[event_type].nil? @event_types[event_type].constantize end end diff --git a/lib/katello/engine.rb b/lib/katello/engine.rb index 04ce82c84f9..42cef966f59 100644 --- a/lib/katello/engine.rb +++ b/lib/katello/engine.rb @@ -77,6 +77,14 @@ class Engine < ::Rails::Engine ForemanTasks.dynflow.eager_load_actions! end + initializer "katello.start_katello_events", before: :finisher_hook do + unless Rails.env.test? + ForemanTasks.dynflow.config.post_executor_init do |world| + Actions::Katello::EventQueue::Monitor.launch(world) + end + end + end + # make sure the Katello plugin is initialized before `after_initialize` # hook so that the resumed Dynflow tasks can rely on everything ready. initializer 'katello.register_plugin', :before => :finisher_hook, :after => 'foreman_remote_execution.register_plugin' do |app| @@ -147,7 +155,6 @@ class Engine < ::Rails::Engine end Katello::EventDaemon::Runner.register_service(:candlepin_events, Katello::CandlepinEventListener) - Katello::EventDaemon::Runner.register_service(:katello_events, Katello::EventMonitor::PollerThread) # Lib Extensions ::Foreman::Renderer::Scope::Variables::Base.include Katello::Concerns::RendererExtensions diff --git a/test/actions/katello/event_queue/monitor_test.rb b/test/actions/katello/event_queue/monitor_test.rb new file mode 100644 index 00000000000..6cb3aac8bc5 --- /dev/null +++ b/test/actions/katello/event_queue/monitor_test.rb @@ -0,0 +1,48 @@ +require 'katello_test_helper' + +module ::Actions::Katello::EventQueue + class MonitorTest < ActiveSupport::TestCase + include Dynflow::Testing + + class ::Dynflow::Testing::DummyCoordinator + def find_locks(_filter) + [] + end + + def acquire(_lock) + true + end + end + + let(:action_class) { ::Actions::Katello::EventQueue::Monitor } + + let(:planned_action) do + action = create_action action_class + plan_action action + end + + let(:running_action) do + Katello::EventQueue.expects(:initialize) + + run_action planned_action + end + + def test_run + Katello::EventMonitor::PollerThread.any_instance.expects(:drain_queue) + + progress_action_time running_action + + assert_equal :suspended, running_action.state + assert_nil running_action.output[:last_error] + end + + def test_run_error + Katello::EventMonitor::PollerThread.any_instance.stubs(:drain_queue).raises(StandardError) + + progress_action_time running_action + + assert_equal :suspended, running_action.state + assert_equal event.id, running_action.output.dig(:last_error, :handler, :event, :id) + end + end +end diff --git a/test/factories/event_factory.rb b/test/factories/event_factory.rb new file mode 100644 index 00000000000..29cc75db312 --- /dev/null +++ b/test/factories/event_factory.rb @@ -0,0 +1,7 @@ +FactoryBot.define do + factory :katello_event, class: Katello::Event do + trait :in_progress do + in_progress { true } + end + end +end diff --git a/test/models/ping_test.rb b/test/models/ping_test.rb index d73e4b46d1f..cd06f40c844 100644 --- a/test/models/ping_test.rb +++ b/test/models/ping_test.rb @@ -123,36 +123,19 @@ def test_ping_candlepin_not_running end def test_ping_katello_events - Katello::EventDaemon::Runner - .expects(:service_status).with(:katello_events) - .returns(processed_count: 0, failed_count: 0, running: true) - - result = Katello::Ping.ping_katello_events({}) - - assert_equal 'ok', result[:status] - assert_equal '0 Processed, 0 Failed', result[:message] - end - - def test_ping_katello_events_starting - Katello::EventDaemon::Runner - .expects(:service_status).with(:katello_events) - .returns(running: 'starting') + Katello::Ping.stubs(:event_queue_foreman_tasks).returns([stub(:task)]) result = Katello::Ping.ping_katello_events({}) assert_equal 'ok', result[:status] - assert_equal '0 Processed, 0 Failed', result[:message] end def test_ping_katello_events_not_running - Katello::EventDaemon::Runner - .expects(:service_status).with(:katello_events) - .returns(processed_count: 10, failed_count: 5, queue_depth: 1001) + Katello::Ping.expects(:event_queue_foreman_tasks).returns([]) result = Katello::Ping.ping_katello_events({}) assert_equal 'FAIL', result[:status] - assert_equal 'Not running', result[:message] end end diff --git a/test/services/katello/event_monitor/poller_thread_test.rb b/test/services/katello/event_monitor/poller_thread_test.rb index ebf60ac4014..699f34abad4 100644 --- a/test/services/katello/event_monitor/poller_thread_test.rb +++ b/test/services/katello/event_monitor/poller_thread_test.rb @@ -3,34 +3,49 @@ module Katello module EventMonitor class PollerThreadTest < ActiveSupport::TestCase - def test_run - Katello::EventMonitor::PollerThread.any_instance.expects(:poll_for_events) + let(:poller) { Katello::EventMonitor::PollerThread.new(Rails.logger, @queue) } - Katello::EventMonitor::PollerThread.run + def setup + @queue = stub(:queue) + end + + def test_drain_stop_condition + @queue.expects(:next_event).never - Katello::EventMonitor::PollerThread.close + poller.drain_queue(-> { true }) + end + + def test_drain_queue_empty + event = build_stubbed(:katello_event) + @queue.stubs(:next_event).returns(event, nil) + + poller.expects(:run_event).with(event) + + poller.drain_queue(-> { false }) end def test_run_event - event = Katello::Event.new(object_id: 100, event_type: 'import_pool') - Katello::EventMonitor::PollerThread.any_instance.expects(:poll_for_events) - Katello::Events::ImportPool.any_instance.expects(:run) - Katello::EventMonitor::PollerThread.run + event = build_stubbed(:katello_event, object_id: 100, event_type: 'whatever') + event_instance = stub(run: true) - Katello::EventMonitor::PollerThread.instance.run_event(event) + @queue.expects(:create_instance).returns(event_instance) + @queue.expects(:mark_in_progress).with(event) + @queue.expects(:clear_events).with('whatever', 100) + @queue.expects(:reschedule_event).never - Katello::EventMonitor::PollerThread.close + poller.run_event(event) end - def test_status - status = { - processed_count: 0, - failed_count: 0, - running: false, - } - Katello::EventMonitor::PollerThread.initialize + def test_run_reschedule + event = build_stubbed(:katello_event, event_type: 'whatever', object_id: 100) + event_instance = stub(:event_instance, run: true, retry: true) + + @queue.expects(:create_instance).returns(event_instance) + @queue.expects(:mark_in_progress).with(event) + @queue.expects(:clear_events).with('whatever', 100) + @queue.expects(:reschedule_event).with(event).once - assert_equal status, Katello::EventMonitor::PollerThread.status + poller.run_event(event) end end end diff --git a/test/services/katello/event_queue_test.rb b/test/services/katello/event_queue_test.rb index 2bd6e3321a7..df7111b8137 100644 --- a/test/services/katello/event_queue_test.rb +++ b/test/services/katello/event_queue_test.rb @@ -28,7 +28,7 @@ def setup end def test_create_instance - event = EventQueue.push_event(@type, 1) + event = build_stubbed(:katello_event, event_type: @type) instance = EventQueue.create_instance(event) @@ -37,9 +37,7 @@ def test_create_instance def test_create_instance_with_metadata metadata = { admin_password: 'sekret' } - event = EventQueue.push_event(MockEventWithMetadata::EVENT_TYPE, 1) do |attrs| - attrs[:metadata] = metadata - end + event = build_stubbed(:katello_event, event_type: MockEventWithMetadata::EVENT_TYPE, metadata: metadata) instance = EventQueue.create_instance(event) @@ -47,50 +45,15 @@ def test_create_instance_with_metadata assert_equal metadata, instance.metadata end - def test_clear_events_only_deletes_last - Event.destroy_all - - event = EventQueue.push_event(@type, 1) - event2 = EventQueue.push_event(@type, 1) - event3 = EventQueue.push_event(@type, 1) - event2.update!(:created_at => event2.created_at - 5.minutes) - event3.update!(:created_at => event3.created_at + 5.minutes) - Event.update_all(:in_progress => true) - refute_empty Event.all - - EventQueue.clear_events(event.event_type, 1, event.created_at) - assert_equal [event3], Event.all - end - def test_clear_events_only_delete_inprogress - Event.destroy_all - - event = EventQueue.push_event(@type, 1) - EventQueue.clear_events(@type, 1, event.created_at) - - assert_equal [event], Event.all - EventQueue.mark_in_progress(event) - EventQueue.clear_events(@type, 1, event.created_at) + event = create(:katello_event, object_id: 1, event_type: @type) + EventQueue.clear_events(@type, 1) - assert_empty Event.all - end - - def test_clear_events_delete_process_after - # Given 2 events E1, E2 - # E1 fails and is rescheduled for later - # E2 is received, runs successfully - # E1 should also be removed to avoid redundant run - - EventQueue.push_event(@type, 1) - failed_event = EventQueue.next_event - EventQueue.reschedule_event(failed_event) - - EventQueue.push_event(@type, 1) - success_event = EventQueue.next_event + assert_equal [event], Event.where(object_id: 1, event_type: @type) + event.update(in_progress: true) + EventQueue.clear_events(@type, 1) - EventQueue.clear_events(@type, 1, success_event.created_at) - - assert_empty Event.all + assert_empty Event.where(object_id: 1, event_type: @type) end def test_event_class @@ -102,32 +65,16 @@ def test_supported_event_types end def test_next_event - EventQueue.register_event('foo', Object) - - event = EventQueue.push_event(@type, 1) - event2 = EventQueue.push_event('foo', 1) - event3 = EventQueue.push_event(@type, 1) + older = create(:katello_event, event_type: @type, object_id: 1) + create(:katello_event, event_type: @type, object_id: 1) next_event = EventQueue.next_event - assert_equal event3, next_event - assert event.reload.in_progress - assert event3.reload.in_progress - refute event2.reload.in_progress - end - def test_next_event_nil - EventQueue.register_event('foo', Object) - Event.destroy_all - assert_nil EventQueue.next_event - - EventQueue.push_event('foo', 1) - refute_nil EventQueue.next_event + assert_equal older, next_event end def test_next_event_process_after - event = EventQueue.push_event(@type, 1) do |attrs| - attrs[:process_after] = Time.zone.now + 5.minutes - end + event = create(:katello_event, event_type: @type, object_id: 1, process_after: 5.minutes.from_now) # next event should not return an event with a process_after date > now assert_nil EventQueue.next_event @@ -141,10 +88,8 @@ def test_next_event_process_after def test_mark_in_progress # marking a new event received while there are rescheduled # events for the same will mark both in progress - urgent_event = EventQueue.push_event(@type, 1) - deferred_event = EventQueue.push_event(@type, 1) do |attrs| - attrs[:process_after] = Time.zone.now + 5.minutes - end + urgent_event = create(:katello_event, event_type: @type, object_id: 1) + deferred_event = create(:katello_event, event_type: @type, object_id: 1, process_after: 5.minutes.from_now) EventQueue.mark_in_progress(urgent_event) urgent_event.reload @@ -158,14 +103,8 @@ def test_mark_in_progress_process_after # two events with the same object_id but staggered # process_after timestamps should not be marked in_progress # if we are in between the process_after times - - process_sooner = EventQueue.push_event(@type, 1) do |attrs| - attrs[:process_after] = 1.minute.from_now - end - - process_later = EventQueue.push_event(@type, 1) do |attrs| - attrs[:process_after] = 2.minutes.from_now - end + process_sooner = create(:katello_event, event_type: @type, object_id: 1, process_after: 1.minute.from_now) + process_later = create(:katello_event, event_type: @type, object_id: 1, process_after: 2.minutes.from_now) travel_to 90.seconds.from_now do Katello::EventQueue.mark_in_progress(process_sooner) @@ -179,8 +118,7 @@ def test_mark_in_progress_process_after end def test_reschedule_event - EventQueue.push_event(@type, 1) - event = Katello::EventQueue.next_event + event = create(:katello_event, :in_progress, event_type: @type, object_id: 1, process_after: nil) assert Katello::EventQueue.reschedule_event(event) event.reload @@ -190,23 +128,17 @@ def test_reschedule_event end def test_reschedule_event_no_retry - EventQueue.push_event(@type, 1) - event = EventQueue.next_event - + event = build_stubbed(:katello_event, object_id: 1, event_type: @type) + Katello::Event.expects(:update).never MockEvent.stubs(:retry_seconds) - assert_nil Katello::EventQueue.reschedule_event(event) - event.reload - assert event.in_progress - refute event.process_after + assert_nil Katello::EventQueue.reschedule_event(event) end def test_reschedule_event_expired - event = EventQueue.push_event(@type, 1) + event = build_stubbed(:katello_event, event_type: @type, object_id: 1, created_at: 7.hours.ago) - travel_to 7.hours.from_now do - assert_equal :expired, Katello::EventQueue.reschedule_event(event) - end + assert_equal :expired, Katello::EventQueue.reschedule_event(event) end end end