diff --git a/examples/halt.rb b/examples/halt.rb new file mode 100755 index 00000000..fa56a181 --- /dev/null +++ b/examples/halt.rb @@ -0,0 +1,71 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +require_relative 'example_helper' + +example_description = < i + 1) } + plan_action Sleeper, :time => 20 + end + plan_self + end + end + + def run + # Noop + end +end + +if $PROGRAM_NAME == __FILE__ + puts example_description + + ExampleHelper.world.action_logger.level = Logger::DEBUG + ExampleHelper.world + t = ExampleHelper.world.trigger(Wrapper) + Thread.new do + sleep 8 + ExampleHelper.world.halt(t.id) + end + + ExampleHelper.run_web_console +end diff --git a/lib/dynflow/coordinator.rb b/lib/dynflow/coordinator.rb index 74aa64ae..56422771 100644 --- a/lib/dynflow/coordinator.rb +++ b/lib/dynflow/coordinator.rb @@ -266,6 +266,19 @@ def self.valid_classes end end + class ExecutionInhibitionLock < Lock + def initialize(execution_plan_id) + super + @data[:owner_id] = "execution-plan:#{execution_plan_id}" + @data[:execution_plan_id] = execution_plan_id + @data[:id] = self.class.lock_id(execution_plan_id) + end + + def self.lock_id(execution_plan_id) + "execution-plan:#{execution_plan_id}" + end + end + class ExecutionLock < LockByWorld def initialize(world, execution_plan_id, client_world_id, request_id) super(world) diff --git a/lib/dynflow/director.rb b/lib/dynflow/director.rb index 2e5fde18..45b813cb 100644 --- a/lib/dynflow/director.rb +++ b/lib/dynflow/director.rb @@ -246,8 +246,28 @@ def terminate end end + def halt(event) + halt_execution(event.execution_plan_id) + end + private + def halt_execution(execution_plan_id) + manager = @execution_plan_managers[execution_plan_id] + @logger.warn "Halting execution plan #{execution_plan_id}" + return halt_inactive(execution_plan_id) unless manager + + manager.halt + finish_manager manager + end + + def halt_inactive(execution_plan_id) + plan = @world.persistence.load_execution_plan(execution_plan_id) + plan.update_state(:stopped) + rescue => e + @logger.error e + end + def unless_done(manager, work_items) return [] unless manager if manager.done? @@ -310,6 +330,14 @@ def track_execution_plan(execution_plan_id, finished) "cannot execute execution_plan_id:#{execution_plan_id} it's stopped" end + lock_class = Coordinator::ExecutionInhibitionLock + filters = { class: lock_class.to_s, owner_id: lock_class.lock_id(execution_plan_id) } + if @world.coordinator.find_records(filters).any? + halt_execution(execution_plan_id) + raise Dynflow::Error, + "cannot execute execution_plan_id:#{execution_plan_id} it's execution is inhibited" + end + @execution_plan_managers[execution_plan_id] = ExecutionPlanManager.new(@world, execution_plan, finished) rescue Dynflow::Error => e diff --git a/lib/dynflow/director/execution_plan_manager.rb b/lib/dynflow/director/execution_plan_manager.rb index e729dc62..7bc638ab 100644 --- a/lib/dynflow/director/execution_plan_manager.rb +++ b/lib/dynflow/director/execution_plan_manager.rb @@ -13,6 +13,7 @@ def initialize(world, execution_plan, future) @execution_plan = Type! execution_plan, ExecutionPlan @future = Type! future, Concurrent::Promises::ResolvableFuture @running_steps_manager = RunningStepsManager.new(world) + @halted = false unless [:planned, :paused].include? execution_plan.state raise "execution_plan is not in pending or paused state, it's #{execution_plan.state}" @@ -25,6 +26,11 @@ def start start_run or start_finalize or finish end + def halt + @halted = true + @running_steps_manager.terminate + end + def restart @run_manager = nil @finalize_manager = nil @@ -72,7 +78,7 @@ def event(event) end def done? - (!@run_manager || @run_manager.done?) && (!@finalize_manager || @finalize_manager.done?) + @halted || (!@run_manager || @run_manager.done?) && (!@finalize_manager || @finalize_manager.done?) end def terminate @@ -88,6 +94,7 @@ def update_steps(steps) def compute_next_from_step(step) raise "run manager not set" unless @run_manager raise "run manager already done" if @run_manager.done? + return [] if @halted next_steps = @run_manager.what_is_next(step) if @run_manager.done? diff --git a/lib/dynflow/director/running_steps_manager.rb b/lib/dynflow/director/running_steps_manager.rb index 11639678..25ee3f31 100644 --- a/lib/dynflow/director/running_steps_manager.rb +++ b/lib/dynflow/director/running_steps_manager.rb @@ -16,6 +16,7 @@ def initialize(world) # to handle potential updates of the step object (that is part of the event) @events = QueueHash.new(Integer, Director::Event) @events_by_request_id = {} + @halted = false end def terminate @@ -27,6 +28,10 @@ def terminate end end + def halt + @halted = true + end + def add(step, work) Type! step, ExecutionPlan::Steps::RunStep @running_steps[step.id] = step @@ -84,6 +89,10 @@ def event(event) event.result.reject UnprocessableEvent.new('step is not suspended, it cannot process events') return [] end + if @halted + event.result.reject UnprocessableEvent.new('execution plan is halted, it cannot receive events') + return [] + end can_run_event = @work_items.empty?(step.id) @events_by_request_id[event.request_id] = event diff --git a/lib/dynflow/dispatcher.rb b/lib/dynflow/dispatcher.rb index ab839ee1..403f3662 100644 --- a/lib/dynflow/dispatcher.rb +++ b/lib/dynflow/dispatcher.rb @@ -29,7 +29,11 @@ module Dispatcher execution_plan_id: type { variants String, NilClass } end - variants Event, Execution, Ping, Status, Planning + Halt = type do + fields! execution_plan_id: String, optional: Algebrick::Types::Boolean + end + + variants Event, Execution, Ping, Status, Planning, Halt end Response = Algebrick.type do diff --git a/lib/dynflow/dispatcher/client_dispatcher.rb b/lib/dynflow/dispatcher/client_dispatcher.rb index 8e7d6830..8a31ec96 100644 --- a/lib/dynflow/dispatcher/client_dispatcher.rb +++ b/lib/dynflow/dispatcher/client_dispatcher.rb @@ -141,6 +141,10 @@ def dispatch_request(request, client_world_id, request_id) ignore_unknown = event.optional find_executor(event.execution_plan_id) end), + (on ~Halt do |event| + executor = find_executor(event.execution_plan_id) + executor == Dispatcher::UnknownWorld ? AnyExecutor : executor + end), (on Ping.(~any, ~any) | Status.(~any, ~any) do |receiver_id, _| receiver_id end) @@ -236,7 +240,7 @@ def resolve_tracked_request(id, error = nil) (on Execution.(execution_plan_id: ~any) do |uuid| @world.persistence.load_execution_plan(uuid) end), - (on Event | Ping do + (on Event | Ping | Halt do true end) @tracked_requests.delete(id).success! resolve_to diff --git a/lib/dynflow/dispatcher/executor_dispatcher.rb b/lib/dynflow/dispatcher/executor_dispatcher.rb index 9de38547..7061f12f 100644 --- a/lib/dynflow/dispatcher/executor_dispatcher.rb +++ b/lib/dynflow/dispatcher/executor_dispatcher.rb @@ -13,7 +13,8 @@ def handle_request(envelope) on(Planning) { perform_planning(envelope, envelope.message) }, on(Execution) { perform_execution(envelope, envelope.message) }, on(Event) { perform_event(envelope, envelope.message) }, - on(Status) { get_execution_status(envelope, envelope.message) }) + on(Status) { get_execution_status(envelope, envelope.message) }, + on(Halt) { halt_execution_plan(envelope, envelope.message) }) end protected @@ -52,6 +53,11 @@ def when_done(plan, envelope, execution, execution_lock) end end + def halt_execution_plan(envelope, execution_plan_id) + @world.executor.halt execution_plan_id + respond(envelope, Done) + end + def perform_event(envelope, event_request) future = on_finish do |f| f.then do diff --git a/lib/dynflow/execution_plan.rb b/lib/dynflow/execution_plan.rb index 971ff3fc..e2dd2513 100644 --- a/lib/dynflow/execution_plan.rb +++ b/lib/dynflow/execution_plan.rb @@ -133,7 +133,9 @@ def update_state(state, history_notice: :auto) telemetry_common_options.merge(:result => key.to_s)) end hooks_to_run << key + world.persistence.delete_delayed_plans(:execution_plan_uuid => id) if delay_record && original == :scheduled unlock_all_singleton_locks! + unlock_execution_inhibition_lock! when :paused unlock_all_singleton_locks! else @@ -566,6 +568,14 @@ def unlock_all_singleton_locks! end end + def unlock_execution_inhibition_lock! + filter = { :owner_id => 'execution-plan:' + self.id, + :class => Dynflow::Coordinator::ExecutionInhibitionLock.to_s } + world.coordinator.find_locks(filter).each do |lock| + world.coordinator.release(lock) + end + end + def toggle_telemetry_state(original, new) return if original == new @label = root_plan_step.action_class if @label.nil? diff --git a/lib/dynflow/executors/abstract/core.rb b/lib/dynflow/executors/abstract/core.rb index 5669e362..ff3fa7b2 100644 --- a/lib/dynflow/executors/abstract/core.rb +++ b/lib/dynflow/executors/abstract/core.rb @@ -66,6 +66,10 @@ def handle_persistence_error(error, work = nil) end end + def halt(execution_plan_id) + @director.halt execution_plan_id + end + def start_termination(*args) logger.info 'shutting down Core ...' super diff --git a/lib/dynflow/executors/parallel.rb b/lib/dynflow/executors/parallel.rb index c9c26fd4..ba735cbb 100644 --- a/lib/dynflow/executors/parallel.rb +++ b/lib/dynflow/executors/parallel.rb @@ -57,6 +57,10 @@ def execution_status(execution_plan_id = nil) @core.ask!([:execution_status, execution_plan_id]) end + def halt(execution_plan_id) + @core.tell([:halt, execution_plan_id]) + end + def initialized @core_initialized end diff --git a/lib/dynflow/world.rb b/lib/dynflow/world.rb index a99670c7..0f3b5f07 100644 --- a/lib/dynflow/world.rb +++ b/lib/dynflow/world.rb @@ -4,6 +4,7 @@ require 'dynflow/world/invalidation' module Dynflow + # rubocop:disable Metrics/ClassLength class World include Algebrick::TypeCheck include Algebrick::Matching @@ -252,6 +253,11 @@ def get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent publish_request(Dispatcher::Status[world_id, execution_plan_id], done, false, timeout) end + def halt(execution_plan_id, accepted = Concurrent::Promises.resolvable_future) + coordinator.acquire(Coordinator::ExecutionInhibitionLock.new(execution_plan_id)) + publish_request(Dispatcher::Halt[execution_plan_id], accepted, false) + end + def publish_request(request, done, wait_for_accepted, timeout = nil) accepted = Concurrent::Promises.resolvable_future accepted.rescue do |reason| @@ -390,4 +396,5 @@ def spawn_and_wait(klass, name, *args) return actor end end + # rubocop:enable Metrics/ClassLength end diff --git a/lib/dynflow/world/invalidation.rb b/lib/dynflow/world/invalidation.rb index 069ed25b..8f1fd87b 100644 --- a/lib/dynflow/world/invalidation.rb +++ b/lib/dynflow/world/invalidation.rb @@ -29,12 +29,27 @@ def invalidate(world) end end + prune_execution_inhibition_locks! + pruned = persistence.prune_envelopes(world.id) logger.error("Pruned #{pruned} envelopes for invalidated world #{world.id}") unless pruned.zero? coordinator.delete_world(world) end end + # Prunes execution inhibition locks which got somehow left behind. + # Any execution inhibition locks, which have their corresponding execution + # plan in stopped state, will be removed. + def prune_execution_inhibition_locks! + locks = coordinator.find_locks(class: Coordinator::ExecutionInhibitionLock.name) + uuids = locks.map { |lock| lock.data[:execution_plan_id] } + plan_uuids = persistence.find_execution_plans(filters: { uuid: uuids, state: 'stopped' }).map(&:id) + + locks.select { |lock| plan_uuids.include? lock.data[:execution_plan_id] }.each do |lock| + coordinator.release(lock) + end + end + def invalidate_planning_lock(planning_lock) with_valid_execution_plan_for_lock(planning_lock) do |plan| plan.steps.values.each { |step| invalidate_step step } diff --git a/test/executor_test.rb b/test/executor_test.rb index a5b48e90..3ade6400 100644 --- a/test/executor_test.rb +++ b/test/executor_test.rb @@ -718,6 +718,99 @@ def assert_next_steps(expected_next_step_ids, finished_step_id = nil, success = assert [world.terminate, world.terminate].map(&:value).all? end end + + describe 'halting' do + include TestHelpers + let(:world) { WorldFactory.create_world } + + it 'halts an execution plan with a suspended step' do + triggered = world.trigger(Support::DummyExample::PlanEventsAction, ping_time: 1) + plan = world.persistence.load_execution_plan(triggered.id) + wait_for do + plan = world.persistence.load_execution_plan(triggered.id) + plan.state == :running + end + world.halt(triggered.id) + wait_for('the execution plan to halt') do + plan = world.persistence.load_execution_plan(triggered.id) + plan.state == :stopped + end + _(plan.steps[2].state).must_equal :suspended + end + + it 'halts a paused execution plan' do + triggered = world.trigger(Support::DummyExample::FailingDummy) + plan = world.persistence.load_execution_plan(triggered.id) + wait_for do + plan = world.persistence.load_execution_plan(triggered.id) + plan.state == :paused + end + world.halt(plan.id) + wait_for('the execution plan to halt') do + plan = world.persistence.load_execution_plan(triggered.id) + plan.state == :stopped + end + _(plan.steps[2].state).must_equal :error + end + + it 'halts a planned execution plan' do + plan = world.plan(Support::DummyExample::Dummy) + wait_for do + plan = world.persistence.load_execution_plan(plan.id) + plan.state == :planned + end + world.halt(plan.id) + wait_for('the execution plan to halt') do + plan = world.persistence.load_execution_plan(plan.id) + plan.state == :stopped + end + _(plan.steps[2].state).must_equal :pending + end + + it 'halts a scheduled execution plan' do + plan = world.delay(Support::DummyExample::Dummy, { start_at: Time.now + 120 }) + wait_for do + plan = world.persistence.load_execution_plan(plan.id) + plan.state == :scheduled + end + world.halt(plan.id) + wait_for('the execution plan to halt') do + plan = world.persistence.load_execution_plan(plan.id) + plan.state == :stopped + end + _(plan.delay_record).must_be :nil? + _(plan.steps[1].state).must_equal :pending + end + + it 'halts a pending execution plan' do + plan = ExecutionPlan.new(world, nil) + plan.save + world.halt(plan.id) + wait_for('the execution plan to halt') do + plan = world.persistence.load_execution_plan(plan.id) + plan.state == :stopped + end + end + end + + describe 'execution inhibition locks' do + include TestHelpers + let(:world) { WorldFactory.create_world } + + it 'inhibits execution' do + plan = world.plan(Support::DummyExample::Dummy) + world.coordinator.acquire(Coordinator::ExecutionInhibitionLock.new(plan.id)) + triggered = world.execute(plan.id) + triggered.wait + _(triggered).must_be :rejected? + + plan = world.persistence.load_execution_plan(plan.id) + _(plan.state).must_equal :stopped + + locks = world.coordinator.find_locks({ class: Coordinator::ExecutionInhibitionLock.to_s, owner_id: "execution-plan:#{plan.id}" }) + _(locks).must_be :empty? + end + end end end end