Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions examples/halt.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#!/usr/bin/env ruby
# frozen_string_literal: true

require_relative 'example_helper'

example_description = <<DESC

Halting example
===================

This example shows, how halting works in Dynflow. It spawns a single action,
which in turn spawns a few evented actions and a single action which occupies
the executor for a long time.

Once the halt event is sent, the execution plan is halted, suspended steps
stay suspended forever, running steps stay running until they actually finish
the current run and the execution state is flipped over to stopped state.

You can see the details at #{ExampleHelper::DYNFLOW_URL}

DESC

class EventedCounter < Dynflow::Action
def run(event = nil)
output[:counter] ||= 0
output[:counter] += 1
action_logger.info "Iteration #{output[:counter]}"

if output[:counter] < input[:count]
plan_event(:tick, 5)
suspend
end
action_logger.info "Done"
end
end

class Sleeper < Dynflow::Action
def run
sleep input[:time]
end
end

class Wrapper < Dynflow::Action
def plan
sequence do
concurrence do
5.times { |i| plan_action(EventedCounter, :count => i + 1) }
plan_action Sleeper, :time => 20
end
plan_self
end
end

def run
# Noop
end
end

if $PROGRAM_NAME == __FILE__
puts example_description

ExampleHelper.world.action_logger.level = Logger::DEBUG
ExampleHelper.world
t = ExampleHelper.world.trigger(Wrapper)
Thread.new do
sleep 8
ExampleHelper.world.halt(t.id)
end

ExampleHelper.run_web_console
end
13 changes: 13 additions & 0 deletions lib/dynflow/coordinator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,19 @@ def self.valid_classes
end
end

class ExecutionInhibitionLock < Lock
def initialize(execution_plan_id)
super
@data[:owner_id] = "execution-plan:#{execution_plan_id}"
@data[:execution_plan_id] = execution_plan_id
@data[:id] = self.class.lock_id(execution_plan_id)
end

def self.lock_id(execution_plan_id)
"execution-plan:#{execution_plan_id}"
end
end

class ExecutionLock < LockByWorld
def initialize(world, execution_plan_id, client_world_id, request_id)
super(world)
Expand Down
28 changes: 28 additions & 0 deletions lib/dynflow/director.rb
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,28 @@ def terminate
end
end

def halt(event)
halt_execution(event.execution_plan_id)
end

private

def halt_execution(execution_plan_id)
manager = @execution_plan_managers[execution_plan_id]
@logger.warn "Halting execution plan #{execution_plan_id}"
return halt_inactive(execution_plan_id) unless manager

manager.halt
finish_manager manager
end

def halt_inactive(execution_plan_id)
plan = @world.persistence.load_execution_plan(execution_plan_id)
plan.update_state(:stopped)
rescue => e
@logger.error e
end

def unless_done(manager, work_items)
return [] unless manager
if manager.done?
Expand Down Expand Up @@ -310,6 +330,14 @@ def track_execution_plan(execution_plan_id, finished)
"cannot execute execution_plan_id:#{execution_plan_id} it's stopped"
end

lock_class = Coordinator::ExecutionInhibitionLock
filters = { class: lock_class.to_s, owner_id: lock_class.lock_id(execution_plan_id) }
if @world.coordinator.find_records(filters).any?
halt_execution(execution_plan_id)
raise Dynflow::Error,
"cannot execute execution_plan_id:#{execution_plan_id} it's execution is inhibited"
end

@execution_plan_managers[execution_plan_id] =
ExecutionPlanManager.new(@world, execution_plan, finished)
rescue Dynflow::Error => e
Expand Down
9 changes: 8 additions & 1 deletion lib/dynflow/director/execution_plan_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def initialize(world, execution_plan, future)
@execution_plan = Type! execution_plan, ExecutionPlan
@future = Type! future, Concurrent::Promises::ResolvableFuture
@running_steps_manager = RunningStepsManager.new(world)
@halted = false

unless [:planned, :paused].include? execution_plan.state
raise "execution_plan is not in pending or paused state, it's #{execution_plan.state}"
Expand All @@ -25,6 +26,11 @@ def start
start_run or start_finalize or finish
end

def halt
@halted = true
@running_steps_manager.terminate
end

def restart
@run_manager = nil
@finalize_manager = nil
Expand Down Expand Up @@ -72,7 +78,7 @@ def event(event)
end

def done?
(!@run_manager || @run_manager.done?) && (!@finalize_manager || @finalize_manager.done?)
@halted || (!@run_manager || @run_manager.done?) && (!@finalize_manager || @finalize_manager.done?)
end

def terminate
Expand All @@ -88,6 +94,7 @@ def update_steps(steps)
def compute_next_from_step(step)
raise "run manager not set" unless @run_manager
raise "run manager already done" if @run_manager.done?
return [] if @halted

next_steps = @run_manager.what_is_next(step)
if @run_manager.done?
Expand Down
9 changes: 9 additions & 0 deletions lib/dynflow/director/running_steps_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def initialize(world)
# to handle potential updates of the step object (that is part of the event)
@events = QueueHash.new(Integer, Director::Event)
@events_by_request_id = {}
@halted = false
end

def terminate
Expand All @@ -27,6 +28,10 @@ def terminate
end
end

def halt
@halted = true
end

def add(step, work)
Type! step, ExecutionPlan::Steps::RunStep
@running_steps[step.id] = step
Expand Down Expand Up @@ -84,6 +89,10 @@ def event(event)
event.result.reject UnprocessableEvent.new('step is not suspended, it cannot process events')
return []
end
if @halted
event.result.reject UnprocessableEvent.new('execution plan is halted, it cannot receive events')
return []
end

can_run_event = @work_items.empty?(step.id)
@events_by_request_id[event.request_id] = event
Expand Down
6 changes: 5 additions & 1 deletion lib/dynflow/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ module Dispatcher
execution_plan_id: type { variants String, NilClass }
end

variants Event, Execution, Ping, Status, Planning
Halt = type do
fields! execution_plan_id: String, optional: Algebrick::Types::Boolean
end

variants Event, Execution, Ping, Status, Planning, Halt
end

Response = Algebrick.type do
Expand Down
6 changes: 5 additions & 1 deletion lib/dynflow/dispatcher/client_dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ def dispatch_request(request, client_world_id, request_id)
ignore_unknown = event.optional
find_executor(event.execution_plan_id)
end),
(on ~Halt do |event|
executor = find_executor(event.execution_plan_id)
executor == Dispatcher::UnknownWorld ? AnyExecutor : executor
end),
(on Ping.(~any, ~any) | Status.(~any, ~any) do |receiver_id, _|
receiver_id
end)
Expand Down Expand Up @@ -236,7 +240,7 @@ def resolve_tracked_request(id, error = nil)
(on Execution.(execution_plan_id: ~any) do |uuid|
@world.persistence.load_execution_plan(uuid)
end),
(on Event | Ping do
(on Event | Ping | Halt do
true
end)
@tracked_requests.delete(id).success! resolve_to
Expand Down
8 changes: 7 additions & 1 deletion lib/dynflow/dispatcher/executor_dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ def handle_request(envelope)
on(Planning) { perform_planning(envelope, envelope.message) },
on(Execution) { perform_execution(envelope, envelope.message) },
on(Event) { perform_event(envelope, envelope.message) },
on(Status) { get_execution_status(envelope, envelope.message) })
on(Status) { get_execution_status(envelope, envelope.message) },
on(Halt) { halt_execution_plan(envelope, envelope.message) })
end

protected
Expand Down Expand Up @@ -52,6 +53,11 @@ def when_done(plan, envelope, execution, execution_lock)
end
end

def halt_execution_plan(envelope, execution_plan_id)
@world.executor.halt execution_plan_id
respond(envelope, Done)
end

def perform_event(envelope, event_request)
future = on_finish do |f|
f.then do
Expand Down
10 changes: 10 additions & 0 deletions lib/dynflow/execution_plan.rb
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ def update_state(state, history_notice: :auto)
telemetry_common_options.merge(:result => key.to_s))
end
hooks_to_run << key
world.persistence.delete_delayed_plans(:execution_plan_uuid => id) if delay_record && original == :scheduled
unlock_all_singleton_locks!
unlock_execution_inhibition_lock!
when :paused
unlock_all_singleton_locks!
else
Expand Down Expand Up @@ -566,6 +568,14 @@ def unlock_all_singleton_locks!
end
end

def unlock_execution_inhibition_lock!
filter = { :owner_id => 'execution-plan:' + self.id,
:class => Dynflow::Coordinator::ExecutionInhibitionLock.to_s }
world.coordinator.find_locks(filter).each do |lock|
world.coordinator.release(lock)
end
end

def toggle_telemetry_state(original, new)
return if original == new
@label = root_plan_step.action_class if @label.nil?
Expand Down
4 changes: 4 additions & 0 deletions lib/dynflow/executors/abstract/core.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ def handle_persistence_error(error, work = nil)
end
end

def halt(execution_plan_id)
@director.halt execution_plan_id
end

def start_termination(*args)
logger.info 'shutting down Core ...'
super
Expand Down
4 changes: 4 additions & 0 deletions lib/dynflow/executors/parallel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ def execution_status(execution_plan_id = nil)
@core.ask!([:execution_status, execution_plan_id])
end

def halt(execution_plan_id)
@core.tell([:halt, execution_plan_id])
end

def initialized
@core_initialized
end
Expand Down
7 changes: 7 additions & 0 deletions lib/dynflow/world.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require 'dynflow/world/invalidation'

module Dynflow
# rubocop:disable Metrics/ClassLength
class World
include Algebrick::TypeCheck
include Algebrick::Matching
Expand Down Expand Up @@ -252,6 +253,11 @@ def get_execution_status(world_id, execution_plan_id, timeout, done = Concurrent
publish_request(Dispatcher::Status[world_id, execution_plan_id], done, false, timeout)
end

def halt(execution_plan_id, accepted = Concurrent::Promises.resolvable_future)
coordinator.acquire(Coordinator::ExecutionInhibitionLock.new(execution_plan_id))
publish_request(Dispatcher::Halt[execution_plan_id], accepted, false)
end

def publish_request(request, done, wait_for_accepted, timeout = nil)
accepted = Concurrent::Promises.resolvable_future
accepted.rescue do |reason|
Expand Down Expand Up @@ -390,4 +396,5 @@ def spawn_and_wait(klass, name, *args)
return actor
end
end
# rubocop:enable Metrics/ClassLength
end
15 changes: 15 additions & 0 deletions lib/dynflow/world/invalidation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,27 @@ def invalidate(world)
end
end

prune_execution_inhibition_locks!

pruned = persistence.prune_envelopes(world.id)
logger.error("Pruned #{pruned} envelopes for invalidated world #{world.id}") unless pruned.zero?
coordinator.delete_world(world)
end
end

# Prunes execution inhibition locks which got somehow left behind.
# Any execution inhibition locks, which have their corresponding execution
# plan in stopped state, will be removed.
def prune_execution_inhibition_locks!
locks = coordinator.find_locks(class: Coordinator::ExecutionInhibitionLock.name)
uuids = locks.map { |lock| lock.data[:execution_plan_id] }
plan_uuids = persistence.find_execution_plans(filters: { uuid: uuids, state: 'stopped' }).map(&:id)

locks.select { |lock| plan_uuids.include? lock.data[:execution_plan_id] }.each do |lock|
coordinator.release(lock)
end
end

def invalidate_planning_lock(planning_lock)
with_valid_execution_plan_for_lock(planning_lock) do |plan|
plan.steps.values.each { |step| invalidate_step step }
Expand Down
Loading