diff --git a/examples/execution_plan_chaining.rb b/examples/execution_plan_chaining.rb new file mode 100755 index 00000000..3468fca7 --- /dev/null +++ b/examples/execution_plan_chaining.rb @@ -0,0 +1,56 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +require_relative 'example_helper' + +class DelayedAction < Dynflow::Action + def plan(should_fail = false) + plan_self :should_fail => should_fail + end + + def run + sleep 5 + raise "Controlled failure" if input[:should_fail] + end + + def rescue_strategy + Dynflow::Action::Rescue::Fail + end +end + +if $PROGRAM_NAME == __FILE__ + world = ExampleHelper.create_world do |config| + config.auto_rescue = true + end + world.action_logger.level = 1 + world.logger.level = 0 + + plan1 = world.trigger(DelayedAction) + plan2 = world.chain(plan1.execution_plan_id, DelayedAction) + plan3 = world.chain(plan2.execution_plan_id, DelayedAction) + plan4 = world.chain(plan2.execution_plan_id, DelayedAction) + + plan5 = world.trigger(DelayedAction, true) + plan6 = world.chain(plan5.execution_plan_id, DelayedAction) + + puts <<-MSG.gsub(/^.*\|/, '') + | + | Execution Plan Chaining example + | ======================== + | + | This example shows the execution plan chaining functionality of Dynflow, which allows execution plans to wait until another execution plan finishes. + | + | Execution plans: + | #{plan1.id} runs immediately and should run successfully. + | #{plan2.id} is delayed and should run once #{plan1.id} finishes. + | #{plan3.id} and #{plan4.id} are delayed and should run once #{plan2.id} finishes. + | + | #{plan5.id} runs immediately and is expected to fail. + | #{plan6.id} should not run at all as its prerequisite failed. + | + | Visit #{ExampleHelper::DYNFLOW_URL} to see their status. + | + MSG + + ExampleHelper.run_web_console(world) +end diff --git a/lib/dynflow/debug/telemetry/persistence.rb b/lib/dynflow/debug/telemetry/persistence.rb index ad5cb38c..4d692611 100644 --- a/lib/dynflow/debug/telemetry/persistence.rb +++ b/lib/dynflow/debug/telemetry/persistence.rb @@ -19,7 +19,7 @@ module Persistence :load_execution_plan, :save_execution_plan, :find_old_execution_plans, - :find_past_delayed_plans, + :find_ready_delayed_plans, :delete_delayed_plans, :save_delayed_plan, :set_delayed_plan_frozen, diff --git a/lib/dynflow/delayed_executors/abstract_core.rb b/lib/dynflow/delayed_executors/abstract_core.rb index ec1d08e6..f9ec1833 100644 --- a/lib/dynflow/delayed_executors/abstract_core.rb +++ b/lib/dynflow/delayed_executors/abstract_core.rb @@ -32,7 +32,7 @@ def time def delayed_execution_plans(time) with_error_handling([]) do - world.persistence.find_past_delayed_plans(time) + world.persistence.find_ready_delayed_plans(time) end end diff --git a/lib/dynflow/delayed_plan.rb b/lib/dynflow/delayed_plan.rb index 972ac8c5..5fd1c6e2 100644 --- a/lib/dynflow/delayed_plan.rb +++ b/lib/dynflow/delayed_plan.rb @@ -31,6 +31,12 @@ def timeout error("Execution plan could not be started before set time (#{@start_before})", 'timeout') end + def failed_dependencies(uuids) + bullets = uuids.map { |u| "- #{u}" }.join("\n") + msg = "Execution plan could not be started because some of its prerequisite execution plans failed:\n#{bullets}" + error(msg, 'failed-dependency') + end + def error(message, history_entry = nil) execution_plan.root_plan_step.state = :error execution_plan.root_plan_step.error = ::Dynflow::ExecutionPlan::Steps::Error.new(message) diff --git a/lib/dynflow/director.rb b/lib/dynflow/director.rb index 45b813cb..14e49758 100644 --- a/lib/dynflow/director.rb +++ b/lib/dynflow/director.rb @@ -114,7 +114,15 @@ def execute plan = world.persistence.load_delayed_plan(execution_plan_id) return if plan.nil? || plan.execution_plan.state != :scheduled - if !plan.start_before.nil? && plan.start_before < Time.now.utc() + if plan.start_before.nil? + blocker_ids = world.persistence.find_execution_plan_dependencies(execution_plan_id) + statuses = world.persistence.find_execution_plan_statuses({ filters: { uuid: blocker_ids } }) + failed = statuses.select { |_uuid, status| status[:state] == 'stopped' && status[:result] == 'error' } + if failed.any? + plan.failed_dependencies(failed.keys) + return + end + elsif plan.start_before < Time.now.utc() plan.timeout return end diff --git a/lib/dynflow/persistence.rb b/lib/dynflow/persistence.rb index b5e16d04..3409004a 100644 --- a/lib/dynflow/persistence.rb +++ b/lib/dynflow/persistence.rb @@ -101,8 +101,16 @@ def find_old_execution_plans(age) end end - def find_past_delayed_plans(time) - adapter.find_past_delayed_plans(time).map do |plan| + def find_execution_plan_dependencies(execution_plan_id) + adapter.find_execution_plan_dependencies(execution_plan_id) + end + + def find_blocked_execution_plans(execution_plan_id) + adapter.find_blocked_execution_plans(execution_plan_id) + end + + def find_ready_delayed_plans(time) + adapter.find_ready_delayed_plans(time).map do |plan| DelayedPlan.new_from_hash(@world, plan) end end @@ -163,5 +171,9 @@ def prune_envelopes(receiver_ids) def prune_undeliverable_envelopes adapter.prune_undeliverable_envelopes end + + def chain_execution_plan(first, second) + adapter.chain_execution_plan(first, second) + end end end diff --git a/lib/dynflow/persistence_adapters/abstract.rb b/lib/dynflow/persistence_adapters/abstract.rb index 994fb9b7..48a75e5e 100644 --- a/lib/dynflow/persistence_adapters/abstract.rb +++ b/lib/dynflow/persistence_adapters/abstract.rb @@ -72,7 +72,15 @@ def save_execution_plan(execution_plan_id, value) raise NotImplementedError end - def find_past_delayed_plans(options = {}) + def find_execution_plan_dependencies(execution_plan_id) + raise NotImplementedError + end + + def find_blocked_execution_plans(execution_plan_id) + raise NotImplementedError + end + + def find_ready_delayed_plans(options = {}) raise NotImplementedError end diff --git a/lib/dynflow/persistence_adapters/sequel.rb b/lib/dynflow/persistence_adapters/sequel.rb index 2c321f85..409ce46b 100644 --- a/lib/dynflow/persistence_adapters/sequel.rb +++ b/lib/dynflow/persistence_adapters/sequel.rb @@ -39,7 +39,8 @@ class action_class execution_plan_uuid queue), envelope: %w(receiver_id), coordinator_record: %w(id owner_id class), delayed: %w(execution_plan_uuid start_at start_before args_serializer frozen), - output_chunk: %w(execution_plan_uuid action_id kind timestamp) } + output_chunk: %w(execution_plan_uuid action_id kind timestamp), + execution_plan_dependency: %w(execution_plan_uuid blocked_by_uuid) } SERIALIZABLE_COLUMNS = { action: %w(input output), delayed: %w(serialized_args), @@ -153,12 +154,31 @@ def find_old_execution_plans(age) records.map { |plan| execution_plan_column_map(load_data plan, table_name) } end - def find_past_delayed_plans(time) + def find_execution_plan_dependencies(execution_plan_id) + table(:execution_plan_dependency) + .where(execution_plan_uuid: execution_plan_id) + .select_map(:blocked_by_uuid) + end + + def find_blocked_execution_plans(execution_plan_id) + table(:execution_plan_dependency) + .where(blocked_by_uuid: execution_plan_id) + .select_map(:execution_plan_uuid) + end + + def find_ready_delayed_plans(time) table_name = :delayed + # Subquery to find delayed plans that have at least one non-stopped dependency + plans_with_unfinished_deps = table(:execution_plan_dependency) + .join(TABLES[:execution_plan], uuid: :blocked_by_uuid) + .where(::Sequel.~(state: 'stopped')) + .select(:execution_plan_uuid) + records = with_retry do table(table_name) - .where(::Sequel.lit('start_at <= ? OR (start_before IS NOT NULL AND start_before <= ?)', time, time)) + .where(::Sequel.lit('start_at IS NULL OR (start_at <= ? OR (start_before IS NOT NULL AND start_before <= ?))', time, time)) .where(:frozen => false) + .exclude(execution_plan_uuid: plans_with_unfinished_deps) .order_by(:start_at) .all end @@ -175,6 +195,10 @@ def save_delayed_plan(execution_plan_id, value) save :delayed, { execution_plan_uuid: execution_plan_id }, value, with_data: false end + def chain_execution_plan(first, second) + save :execution_plan_dependency, {}, { execution_plan_uuid: second, blocked_by_uuid: first }, with_data: false + end + def load_step(execution_plan_id, step_id) load :step, execution_plan_uuid: execution_plan_id, id: step_id end @@ -319,7 +343,8 @@ def abort_if_pending_migrations! envelope: :dynflow_envelopes, coordinator_record: :dynflow_coordinator_records, delayed: :dynflow_delayed_plans, - output_chunk: :dynflow_output_chunks } + output_chunk: :dynflow_output_chunks, + execution_plan_dependency: :dynflow_execution_plan_dependencies } def table(which) db[TABLES.fetch(which)] diff --git a/lib/dynflow/persistence_adapters/sequel_migrations/025_create_execution_plan_dependencies.rb b/lib/dynflow/persistence_adapters/sequel_migrations/025_create_execution_plan_dependencies.rb new file mode 100644 index 00000000..69717d02 --- /dev/null +++ b/lib/dynflow/persistence_adapters/sequel_migrations/025_create_execution_plan_dependencies.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +Sequel.migration do + up do + type = database_type + create_table(:dynflow_execution_plan_dependencies) do + column_properties = if type.to_s.include?('postgres') + { type: :uuid } + else + { type: String, size: 36, fixed: true, null: false } + end + foreign_key :execution_plan_uuid, :dynflow_execution_plans, on_delete: :cascade, **column_properties + foreign_key :blocked_by_uuid, :dynflow_execution_plans, on_delete: :cascade, **column_properties + index :blocked_by_uuid + index :execution_plan_uuid + end + end + + down do + drop_table(:dynflow_execution_plan_dependencies) + end +end diff --git a/lib/dynflow/world.rb b/lib/dynflow/world.rb index 8d7433db..2dc1856f 100644 --- a/lib/dynflow/world.rb +++ b/lib/dynflow/world.rb @@ -202,6 +202,16 @@ def delay_with_options(action_class:, args:, delay_options:, id: nil, caller_act Scheduled[execution_plan.id] end + def chain(plan_uuids, action_class, *args) + plan_uuids = [plan_uuids] unless plan_uuids.is_a? Array + result = delay_with_options(action_class: action_class, args: args, delay_options: { frozen: true }) + plan_uuids.each do |plan_uuid| + persistence.chain_execution_plan(plan_uuid, result.execution_plan_id) + end + persistence.set_delayed_plan_frozen(result.execution_plan_id, false) + result + end + def plan_elsewhere(action_class, *args) execution_plan = ExecutionPlan.new(self, nil) execution_plan.delay(nil, action_class, {}, *args) diff --git a/test/future_execution_test.rb b/test/future_execution_test.rb index fee3377d..ebe56c1c 100644 --- a/test/future_execution_test.rb +++ b/test/future_execution_test.rb @@ -76,7 +76,7 @@ module FutureExecutionTest it 'finds delayed plans' do @start_at = Time.now.utc - 100 delayed_plan - past_delayed_plans = world.persistence.find_past_delayed_plans(@start_at + 10) + past_delayed_plans = world.persistence.find_ready_delayed_plans(@start_at + 10) _(past_delayed_plans.length).must_equal 1 _(past_delayed_plans.first.execution_plan_uuid).must_equal execution_plan.id end @@ -113,8 +113,8 @@ module FutureExecutionTest it 'checks for delayed plans in regular intervals' do start_time = klok.current_time - persistence.expect(:find_past_delayed_plans, [], [start_time]) - persistence.expect(:find_past_delayed_plans, [], [start_time + options[:poll_interval]]) + persistence.expect(:find_ready_delayed_plans, [], [start_time]) + persistence.expect(:find_ready_delayed_plans, [], [start_time + options[:poll_interval]]) dummy_world.stub :persistence, persistence do _(klok.pending_pings.length).must_equal 0 delayed_executor.start.wait @@ -190,6 +190,152 @@ module FutureExecutionTest _(serializer.args).must_equal args end end + + describe 'execution plan chaining' do + let(:world) do + WorldFactory.create_world { |config| config.auto_rescue = true } + end + + before do + @preexisting = world.persistence.find_ready_delayed_plans(Time.now).map(&:execution_plan_uuid) + end + + it 'chains two execution plans' do + plan1 = world.plan(Support::DummyExample::Dummy) + plan2 = world.chain(plan1.id, Support::DummyExample::Dummy) + + Concurrent::Promises.resolvable_future.tap do |promise| + world.execute(plan1.id, promise) + end.wait + + plan1 = world.persistence.load_execution_plan(plan1.id) + _(plan1.state).must_equal :stopped + ready = world.persistence.find_ready_delayed_plans(Time.now).reject { |p| @preexisting.include? p.execution_plan_uuid } + _(ready.count).must_equal 1 + _(ready.first.execution_plan_uuid).must_equal plan2.execution_plan_id + end + + it 'chains onto multiple execution plans and waits for all to finish' do + plan1 = world.plan(Support::DummyExample::Dummy) + plan2 = world.plan(Support::DummyExample::Dummy) + plan3 = world.chain([plan2.id, plan1.id], Support::DummyExample::Dummy) + + # Execute and complete plan1 + Concurrent::Promises.resolvable_future.tap do |promise| + world.execute(plan1.id, promise) + end.wait + + plan1 = world.persistence.load_execution_plan(plan1.id) + _(plan1.state).must_equal :stopped + + # plan3 should still not be ready because plan2 hasn't finished yet + ready = world.persistence.find_ready_delayed_plans(Time.now).reject { |p| @preexisting.include? p.execution_plan_uuid } + _(ready.count).must_equal 0 + + # Execute and complete plan2 + Concurrent::Promises.resolvable_future.tap do |promise| + world.execute(plan2.id, promise) + end.wait + + plan2 = world.persistence.load_execution_plan(plan2.id) + _(plan2.state).must_equal :stopped + + # Now plan3 should be ready since both plan1 and plan2 are complete + ready = world.persistence.find_ready_delayed_plans(Time.now).reject { |p| @preexisting.include? p.execution_plan_uuid } + _(ready.count).must_equal 1 + _(ready.first.execution_plan_uuid).must_equal plan3.execution_plan_id + end + + it 'cancels the chained plan if the prerequisite fails' do + plan1 = world.plan(Support::DummyExample::FailingDummy) + plan2 = world.chain(plan1.id, Support::DummyExample::Dummy) + + Concurrent::Promises.resolvable_future.tap do |promise| + world.execute(plan1.id, promise) + end.wait + + plan1 = world.persistence.load_execution_plan(plan1.id) + _(plan1.state).must_equal :stopped + _(plan1.result).must_equal :error + + # plan2 will appear in ready delayed plans + ready = world.persistence.find_ready_delayed_plans(Time.now).reject { |p| @preexisting.include? p.execution_plan_uuid } + _(ready.map(&:execution_plan_uuid)).must_equal [plan2.execution_plan_id] + + # Process the delayed plan through the director + work_item = Dynflow::Director::PlanningWorkItem.new(plan2.execution_plan_id, :default, world.id) + work_item.world = world + work_item.execute + + # Now plan2 should be stopped with error due to failed dependency + plan2 = world.persistence.load_execution_plan(plan2.execution_plan_id) + _(plan2.state).must_equal :stopped + _(plan2.result).must_equal :error + _(plan2.errors.first.message).must_match(/prerequisite execution plans failed/) + _(plan2.errors.first.message).must_match(/#{plan1.id}/) + end + + it 'cancels the chained plan if at least one prerequisite fails' do + plan1 = world.plan(Support::DummyExample::Dummy) + plan2 = world.plan(Support::DummyExample::FailingDummy) + plan3 = world.chain([plan1.id, plan2.id], Support::DummyExample::Dummy) + + # Execute and complete plan1 successfully + Concurrent::Promises.resolvable_future.tap do |promise| + world.execute(plan1.id, promise) + end.wait + + plan1 = world.persistence.load_execution_plan(plan1.id) + _(plan1.state).must_equal :stopped + _(plan1.result).must_equal :success + + # plan3 should still not be ready because plan2 hasn't finished yet + ready = world.persistence.find_ready_delayed_plans(Time.now).reject { |p| @preexisting.include? p.execution_plan_uuid } + _(ready).must_equal [] + + # Execute and complete plan2 with failure + Concurrent::Promises.resolvable_future.tap do |promise| + world.execute(plan2.id, promise) + end.wait + + plan2 = world.persistence.load_execution_plan(plan2.id) + _(plan2.state).must_equal :stopped + _(plan2.result).must_equal :error + + # plan3 will now appear in ready delayed plans even though one prerequisite failed + ready = world.persistence.find_ready_delayed_plans(Time.now).reject { |p| @preexisting.include? p.execution_plan_uuid } + _(ready.map(&:execution_plan_uuid)).must_equal [plan3.execution_plan_id] + + # Process the delayed plan through the director + work_item = Dynflow::Director::PlanningWorkItem.new(plan3.execution_plan_id, :default, world.id) + work_item.world = world + work_item.execute + + # Now plan3 should be stopped with error due to failed dependency + plan3 = world.persistence.load_execution_plan(plan3.execution_plan_id) + _(plan3.state).must_equal :stopped + _(plan3.result).must_equal :error + _(plan3.errors.first.message).must_match(/prerequisite execution plans failed/) + _(plan3.errors.first.message).must_match(/#{plan2.id}/) + end + + it 'chains runs the chained plan if the prerequisite was halted' do + plan1 = world.plan(Support::DummyExample::Dummy) + plan2 = world.chain(plan1.id, Support::DummyExample::Dummy) + + world.halt(plan1.id) + Concurrent::Promises.resolvable_future.tap do |promise| + world.execute(plan1.id, promise) + end.wait + + plan1 = world.persistence.load_execution_plan(plan1.id) + _(plan1.state).must_equal :stopped + _(plan1.result).must_equal :pending + ready = world.persistence.find_ready_delayed_plans(Time.now).reject { |p| @preexisting.include? p.execution_plan_uuid } + _(ready.count).must_equal 1 + _(ready.first.execution_plan_uuid).must_equal plan2.execution_plan_id + end + end end end end diff --git a/test/persistence_test.rb b/test/persistence_test.rb index aa0e87ec..39841780 100644 --- a/test/persistence_test.rb +++ b/test/persistence_test.rb @@ -342,7 +342,7 @@ def self.it_acts_as_persistence_adapter end end - describe '#find_past_delayed_plans' do + describe '#find_ready_delayed_plans' do it 'finds plans with start_before in past' do start_time = Time.now.utc prepare_and_save_plans @@ -352,7 +352,7 @@ def self.it_acts_as_persistence_adapter adapter.save_delayed_plan('plan3', :execution_plan_uuid => 'plan3', :frozen => false, :start_at => format_time(start_time + 60)) adapter.save_delayed_plan('plan4', :execution_plan_uuid => 'plan4', :frozen => false, :start_at => format_time(start_time - 60), :start_before => format_time(start_time - 60)) - plans = adapter.find_past_delayed_plans(start_time) + plans = adapter.find_ready_delayed_plans(start_time) _(plans.length).must_equal 3 _(plans.map { |plan| plan[:execution_plan_uuid] }).must_equal %w(plan2 plan4 plan1) end @@ -366,10 +366,77 @@ def self.it_acts_as_persistence_adapter adapter.save_delayed_plan('plan2', :execution_plan_uuid => 'plan2', :frozen => true, :start_at => format_time(start_time + 60), :start_before => format_time(start_time - 60)) - plans = adapter.find_past_delayed_plans(start_time) + plans = adapter.find_ready_delayed_plans(start_time) _(plans.length).must_equal 1 _(plans.first[:execution_plan_uuid]).must_equal 'plan1' end + + it 'finds plans with null start_at' do + start_time = Time.now.utc + prepare_and_save_plans + + adapter.save_delayed_plan('plan1', :execution_plan_uuid => 'plan1', :frozen => false) + + plans = adapter.find_ready_delayed_plans(start_time) + _(plans.length).must_equal 1 + _(plans.first[:execution_plan_uuid]).must_equal 'plan1' + end + + it 'properly stored execution plan dependencies' do + prepare_and_save_plans + + adapter.save_delayed_plan('plan1', :execution_plan_uuid => 'plan1', :frozen => false) + adapter.chain_execution_plan('plan2', 'plan1') + adapter.chain_execution_plan('plan3', 'plan1') + dependencies = adapter.find_execution_plan_dependencies('plan1') + _(dependencies.to_set).must_equal ['plan2', 'plan3'].to_set + end + + it 'does not find blocked plans' do + start_time = Time.now.utc + prepare_and_save_plans + + adapter.save_delayed_plan('plan1', :execution_plan_uuid => 'plan1', :frozen => false) + adapter.chain_execution_plan('plan2', 'plan1') + adapter.chain_execution_plan('plan3', 'plan1') + + plans = adapter.find_ready_delayed_plans(start_time) + _(plans.length).must_equal 0 + end + + it 'finds plans which are no longer blocked' do + start_time = Time.now.utc + prepare_and_save_plans + + adapter.save_delayed_plan('plan1', :execution_plan_uuid => 'plan1', :frozen => false) + adapter.chain_execution_plan('plan2', 'plan1') + + plans = adapter.find_ready_delayed_plans(start_time) + _(plans.length).must_equal 1 + _(plans.first[:execution_plan_uuid]).must_equal 'plan1' + end + + it 'does not find plans which are no longer blocked but are frozen' do + start_time = Time.now.utc + prepare_and_save_plans + + adapter.save_delayed_plan('plan1', :execution_plan_uuid => 'plan1', :frozen => true) + adapter.chain_execution_plan('plan2', 'plan1') + + plans = adapter.find_ready_delayed_plans(start_time) + _(plans.length).must_equal 0 + end + + it 'does not find plans which are no longer blocked but their start_at is in the future' do + start_time = Time.now.utc + prepare_and_save_plans + + adapter.save_delayed_plan('plan1', :execution_plan_uuid => 'plan1', :frozen => false, :start_at => start_time + 60) + adapter.chain_execution_plan('plan2', 'plan1') # plan2 is already stopped + + plans = adapter.find_ready_delayed_plans(start_time) + _(plans.length).must_equal 0 + end end describe '#delete_output_chunks' do diff --git a/test/support/dummy_example.rb b/test/support/dummy_example.rb index 94c3a9a7..89578c45 100644 --- a/test/support/dummy_example.rb +++ b/test/support/dummy_example.rb @@ -31,6 +31,10 @@ def run; end class FailingDummy < Dynflow::Action def run; raise 'error'; end + + def rescue_strategy + Dynflow::Action::Rescue::Fail + end end class Slow < Dynflow::Action diff --git a/web/views/show.erb b/web/views/show.erb index 7e895f68..cc2f3665 100644 --- a/web/views/show.erb +++ b/web/views/show.erb @@ -43,6 +43,30 @@ <%= h(@plan.ended_at) %>
+<% dependencies = @plan.world.persistence.find_execution_plan_dependencies(@plan.id) %> +<% if dependencies.any? %> ++ Depends on execution plans: +
+ Blocks execution plans: +