Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions app/lib/actions/katello/event_queue/monitor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
module Actions
module Katello
module EventQueue
class Monitor < Actions::EntryAction
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default rescue_strategy is to pause the action. Iirc paused actions still hold the singleton lock. Using any of the other two strategies which would cause the lock to be unlocked on failure would probably be better idea for more hands-off operation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iirc foreman-maintain performs checks for long running tasks before upgrade, we'll probably need to whitelist this new action in there.

include ::Dynflow::Action::Singleton
include ::Dynflow::Action::Polling

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless stated otherwise, actions use the default queue. The default sidekiq queue is already used for almost everything, is it ok that event processing might be delayed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be OK in combination with your prior point: instead of using a Poll -> Handle ping-pong the Poll will fully drain the queue in a loop which can be broken early with if world.terminating? to make sure shutdown isn't blocked.

def self.launch(world)
unless self.singleton_locked?(world)
ForemanTasks.trigger(self)
end
end

def invoke_external_task
::Katello::EventQueue.initialize
action_logger.info "Katello Event Queue initialized"
end

def run(event = nil)
case event
when Skip
# noop
else
super
end
end

def stop_condition
-> { world.terminating? }
end

def poll_external_task
poller = ::Katello::EventMonitor::PollerThread.new
User.as_anonymous_admin do
poller.drain_queue(-> { world.terminating? })
end
end

def done?
false
end

def poll_intervals
[2]
end

def rescue_strategy
::Dynflow::Action::Rescue::Skip
end

def humanized_name
_("Katello Event Queue")
end
end
end
end
end
11 changes: 8 additions & 3 deletions app/models/katello/ping.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,15 @@ def event_daemon_status(status, result)
end
end

def event_queue_foreman_tasks
ForemanTasks::Task.running.where(label: Actions::Katello::EventQueue::Monitor.name)
end

def ping_katello_events(result)
exception_watch(result) do
status = Katello::EventDaemon::Runner.service_status(:katello_events)
event_daemon_status(status, result)
if event_queue_foreman_tasks.empty?
result[:status] = FAIL_RETURN_CODE
end
end
end

Expand Down Expand Up @@ -238,7 +243,7 @@ def ping_services_for_capsule(services, capsule_id)
ping_pulp_with_auth(result[:pulp_auth], result[:pulp][:status], capsule_id) if result.include?(:pulp_auth)
ping_candlepin_with_auth(result[:candlepin_auth]) if result.include?(:candlepin_auth)
ping_foreman_tasks(result[:foreman_tasks]) if result.include?(:foreman_tasks)
ping_katello_events(result[:katello_events]) if result.include?(:katello_events)
ping_katello_events(result[:katello_events]) if result.include?(:katello_events) # Wait for foreman-tasks result
ping_candlepin_events(result[:candlepin_events]) if result.include?(:candlepin_events)

# set overall status result code
Expand Down
111 changes: 18 additions & 93 deletions app/services/katello/event_monitor/poller_thread.rb
Original file line number Diff line number Diff line change
@@ -1,106 +1,31 @@
module Katello
module EventMonitor
# TODO: Move this class to app/lib/katello/event_daemon/services with other service definitions
class PollerThread
SLEEP_INTERVAL = 3

cattr_accessor :instance

def self.initialize(logger = nil)
self.instance ||= self.new(logger)
end

def self.close
if self.instance
self.instance.close
self.instance = nil
end
end

def self.run
initialize
::Katello::EventQueue.reset_in_progress
instance.poll_for_events
end

def self.status
instance&.status
end

def initialize(logger = nil)
@logger = logger || ::Foreman::Logging.logger('katello/katello_events')
@failed_count = 0
@processed_count = 0
def initialize(logger = Rails.logger, queue = Katello::EventQueue)
@logger = logger
@queue = queue
end

def close
@logger.info("Stopping Katello Event Monitor")
@thread&.kill
end

def running?
@thread&.status || false
end

def status
{
processed_count: @processed_count,
failed_count: @failed_count,
running: running?,
}
end

def run_event(event)
Katello::Logging.time("katello event handled") do |data|
data[:type] = event.event_type
data[:object_id] = event.object_id
data[:expired] = false
data[:rescheduled] = false

event_instance = nil
begin
::User.as_anonymous_admin do
event_instance = ::Katello::EventQueue.create_instance(event)
event_instance.run
end
rescue => e
@failed_count += 1
@logger.error("event_queue_error: type=#{event.event_type}, object_id=#{event.object_id}")
@logger.error(e.message)
@logger.error(e.backtrace.join("\n"))
ensure
if event_instance.try(:retry)
result = ::Katello::EventQueue.reschedule_event(event)
if result == :expired
@logger.warn("event_queue_event_expired: type=#{event.event_type} object_id=#{event.object_id}")
elsif !result.nil?
@logger.warn("event_queue_rescheduled: type=#{event.event_type} object_id=#{event.object_id}")
end
end
::Katello::EventQueue.clear_events(event.event_type, event.object_id, event.created_at)
def drain_queue(stop_condition)
until stop_condition.call || (event = @queue.next_event).nil?
Katello::Logging.time('katello event handled') do |data|
data[:event_type] = event.event_type
data[:object_id] = event.object_id
run_event(event)
end
end
end

def poll_for_events
@thread = Thread.new do
@logger.info("Polling Katello Event Queue")
loop do
Rails.application.executor.wrap do
Katello::Util::Support.with_db_connection(@logger) do
until (event = ::Katello::EventQueue.next_event).nil?
run_event(event)
@processed_count += 1
end
end
end

sleep SLEEP_INTERVAL
def run_event(event)
@queue.mark_in_progress(event)
begin
event_instance = @queue.create_instance(event)
event_instance.run
ensure
if event_instance.try(:retry)
@queue.reschedule_event(event)
end
rescue => e
@logger.error(e.message)
@logger.error("Fatal error in Katello Event Monitor")
self.class.close
@queue.clear_events(event.event_type, event.object_id)
end
end
end
Expand Down
23 changes: 14 additions & 9 deletions app/services/katello/event_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,24 @@ class EventQueue
@event_types = {}

def self.create_instance(event)
return unless supported_event_types.include?(event.event_type)

event_class = ::Katello::EventQueue.event_class(event.event_type)

event_class.new(event.object_id) do |instance|
instance.metadata = event.metadata if event.metadata
end
end

def self.initialize
delete_unsupported_events
reset_in_progress
end

def self.delete_unsupported_events
Katello::Event.where.not(event_type: supported_event_types).delete_all
end

def self.queue_depth
::Katello::Event.all.size
end
Expand All @@ -20,17 +31,12 @@ def self.runnable_events
Katello::Event.where(process_after: nil).or(Katello::Event.where(process_after: Date.new..Time.zone.now))
end

def self.clear_events(event_type, object_id, on_or_earlier_than)
Katello::Event.where(:in_progress => true, :object_id => object_id, :event_type => event_type).where('created_at <= ?', on_or_earlier_than).delete_all
def self.clear_events(event_type, object_id)
Katello::Event.where(:in_progress => true, :object_id => object_id, :event_type => event_type).delete_all
end

def self.next_event
first = runnable_events.where(:in_progress => false).order(:created_at => 'asc').first
return if first.nil?
last = runnable_events.where(:in_progress => false, :object_id => first.object_id,
:event_type => first.event_type).order(:created_at => 'desc').first
mark_in_progress(first)
last
runnable_events.where(in_progress: false).order(created_at: :asc).first
end

def self.mark_in_progress(event)
Expand Down Expand Up @@ -79,7 +85,6 @@ def self.supported_event_types
end

def self.event_class(event_type)
fail _("Invalid event_type %s") % event_type if @event_types[event_type].nil?
@event_types[event_type].constantize
end
end
Expand Down
9 changes: 8 additions & 1 deletion lib/katello/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ class Engine < ::Rails::Engine
ForemanTasks.dynflow.eager_load_actions!
end

initializer "katello.start_katello_events", before: :finisher_hook do
unless Rails.env.test?
ForemanTasks.dynflow.config.post_executor_init do |world|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the action stops/pauses/whatever, the event queue won't be processed until the dynflow-sidekiq@orchestrator service is restarted. We should pay extra care to make the action essential immortal

Actions::Katello::EventQueue::Monitor.launch(world)
end
end
end

# make sure the Katello plugin is initialized before `after_initialize`
# hook so that the resumed Dynflow tasks can rely on everything ready.
initializer 'katello.register_plugin', :before => :finisher_hook, :after => 'foreman_remote_execution.register_plugin' do |app|
Expand Down Expand Up @@ -147,7 +155,6 @@ class Engine < ::Rails::Engine
end

Katello::EventDaemon::Runner.register_service(:candlepin_events, Katello::CandlepinEventListener)
Katello::EventDaemon::Runner.register_service(:katello_events, Katello::EventMonitor::PollerThread)

# Lib Extensions
::Foreman::Renderer::Scope::Variables::Base.include Katello::Concerns::RendererExtensions
Expand Down
48 changes: 48 additions & 0 deletions test/actions/katello/event_queue/monitor_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
require 'katello_test_helper'

module ::Actions::Katello::EventQueue
class MonitorTest < ActiveSupport::TestCase
include Dynflow::Testing

class ::Dynflow::Testing::DummyCoordinator
def find_locks(_filter)
[]
end

def acquire(_lock)
true
end
end

let(:action_class) { ::Actions::Katello::EventQueue::Monitor }

let(:planned_action) do
action = create_action action_class
plan_action action
end

let(:running_action) do
Katello::EventQueue.expects(:initialize)

run_action planned_action
end

def test_run
Katello::EventMonitor::PollerThread.any_instance.expects(:drain_queue)

progress_action_time running_action

assert_equal :suspended, running_action.state
assert_nil running_action.output[:last_error]
end

def test_run_error
Katello::EventMonitor::PollerThread.any_instance.stubs(:drain_queue).raises(StandardError)

progress_action_time running_action

assert_equal :suspended, running_action.state
assert_equal event.id, running_action.output.dig(:last_error, :handler, :event, :id)
end
end
end
7 changes: 7 additions & 0 deletions test/factories/event_factory.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FactoryBot.define do
factory :katello_event, class: Katello::Event do
trait :in_progress do
in_progress { true }
end
end
end
21 changes: 2 additions & 19 deletions test/models/ping_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -123,36 +123,19 @@ def test_ping_candlepin_not_running
end

def test_ping_katello_events
Katello::EventDaemon::Runner
.expects(:service_status).with(:katello_events)
.returns(processed_count: 0, failed_count: 0, running: true)

result = Katello::Ping.ping_katello_events({})

assert_equal 'ok', result[:status]
assert_equal '0 Processed, 0 Failed', result[:message]
end

def test_ping_katello_events_starting
Katello::EventDaemon::Runner
.expects(:service_status).with(:katello_events)
.returns(running: 'starting')
Katello::Ping.stubs(:event_queue_foreman_tasks).returns([stub(:task)])

result = Katello::Ping.ping_katello_events({})

assert_equal 'ok', result[:status]
assert_equal '0 Processed, 0 Failed', result[:message]
end

def test_ping_katello_events_not_running
Katello::EventDaemon::Runner
.expects(:service_status).with(:katello_events)
.returns(processed_count: 10, failed_count: 5, queue_depth: 1001)
Katello::Ping.expects(:event_queue_foreman_tasks).returns([])

result = Katello::Ping.ping_katello_events({})

assert_equal 'FAIL', result[:status]
assert_equal 'Not running', result[:message]
end
end

Expand Down
Loading
Loading