diff --git a/lib/dynflow/action/v2/with_sub_plans.rb b/lib/dynflow/action/v2/with_sub_plans.rb index de553777..476024bb 100644 --- a/lib/dynflow/action/v2/with_sub_plans.rb +++ b/lib/dynflow/action/v2/with_sub_plans.rb @@ -102,7 +102,7 @@ def spawn_plans def increase_counts(planned, failed) output[:planned_count] += planned + failed - output[:failed_count] = output.fetch(:failed_count, 0) + failed + output[:failed_count] = output.fetch(:failed_count, 0) + failed output[:pending_count] = output.fetch(:pending_count, 0) + planned output[:success_count] ||= 0 end @@ -129,12 +129,20 @@ def run_progress end def recalculate_counts - total = total_count - failed = sub_plans_count('state' => %w(paused stopped), 'result' => %w(error warning)) + total = total_count + if output[:cancelled_timestamp] + cancelled_scheduled_plans = sub_plans_count_after(output[:cancelled_timestamp], { 'state' => %w(paused stopped), 'result' => %w(error warning) }) + cancelled_unscheduled_plans = total_count - output[:planned_count] + cancelled = cancelled_unscheduled_plans + cancelled_scheduled_plans + else + cancelled = cancelled_scheduled_plans = 0 + end + failed = sub_plans_count('state' => %w(paused stopped), 'result' => %w(error warning)) - cancelled_scheduled_plans success = sub_plans_count('state' => 'stopped', 'result' => 'success') - output.update(:pending_count => total - failed - success, - :failed_count => failed - output.fetch(:resumed_count, 0), - :success_count => success) + output.update(:pending_count => total - failed - success - cancelled_scheduled_plans, + :failed_count => failed - output.fetch(:resumed_count, 0), + :success_count => success, + :cancelled_count => cancelled) end def counts_set? @@ -142,7 +150,7 @@ def counts_set? end def check_for_errors! - raise SubtaskFailedException.new("A sub task failed") if output[:failed_count] > 0 + raise SubtaskFailedException.new("A sub task failed") if output[:failed_count] + output[:cancelled_count] > 0 end # Helper for creating sub plans @@ -173,6 +181,7 @@ def concurrency_limit_capacity def cancel!(force = false) # Count the not-yet-planned tasks as cancelled output[:cancelled_count] = total_count - output[:planned_count] + output[:cancelled_timestamp] ||= Time.now.utc.iso8601 # time in UTC for comparison with UTC times in the database on_planning_finished if output[:cancelled_count].positive? # Pass the cancel event to running sub plans if they can be cancelled sub_plans(:state => 'running').each { |sub_plan| sub_plan.cancel(force) if sub_plan.cancellable? } @@ -198,7 +207,9 @@ def can_spawn_next_batch? end def remaining_count - total_count - output[:cancelled_count] - output[:planned_count] + return 0 if output[:cancelled_timestamp] + + total_count - output[:planned_count] end private @@ -216,5 +227,9 @@ def sub_plans(filter = {}) def sub_plans_count(filter = {}) world.persistence.find_execution_plan_counts(filters: sub_plan_filter.merge(filter)) end + + def sub_plans_count_after(timestamp, filter = {}) + world.persistence.find_execution_plan_counts_after(timestamp, { filters: sub_plan_filter.merge(filter) }) + end end end diff --git a/lib/dynflow/persistence.rb b/lib/dynflow/persistence.rb index 7facce49..b5e16d04 100644 --- a/lib/dynflow/persistence.rb +++ b/lib/dynflow/persistence.rb @@ -73,6 +73,10 @@ def find_execution_plan_counts(options) adapter.find_execution_plan_counts(options) end + def find_execution_plan_counts_after(timestamp, options) + adapter.find_execution_plan_counts_after(timestamp, options) + end + def delete_execution_plans(filters, batch_size = 1000, enforce_backup_dir = nil) backup_dir = enforce_backup_dir || current_backup_dir adapter.delete_execution_plans(filters, batch_size, backup_dir) diff --git a/lib/dynflow/persistence_adapters/abstract.rb b/lib/dynflow/persistence_adapters/abstract.rb index 94509b06..994fb9b7 100644 --- a/lib/dynflow/persistence_adapters/abstract.rb +++ b/lib/dynflow/persistence_adapters/abstract.rb @@ -46,6 +46,10 @@ def find_execution_plan_counts(options = {}) filter(:execution_plan, options[:filters]).count end + def find_execution_plan_counts_after(timestamp, options = {}) + raise NotImplementedError + end + def find_execution_plan_statuses(options) raise NotImplementedError end diff --git a/lib/dynflow/persistence_adapters/sequel.rb b/lib/dynflow/persistence_adapters/sequel.rb index db91b294..94d05bb5 100644 --- a/lib/dynflow/persistence_adapters/sequel.rb +++ b/lib/dynflow/persistence_adapters/sequel.rb @@ -78,6 +78,10 @@ def find_execution_plan_counts(options = {}) filter(:execution_plan, table(:execution_plan), options[:filters]).count end + def find_execution_plan_counts_after(timestamp, options = {}) + filter(:execution_plan, table(:execution_plan), options[:filters]).filter(::Sequel.lit('ended_at >= ?', timestamp)).count + end + def find_execution_plan_statuses(options) plans = filter(:execution_plan, table(:execution_plan), options[:filters]) .select(:uuid, :state, :result)