Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions lib/dynflow/action/v2/with_sub_plans.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def spawn_plans

def increase_counts(planned, failed)
output[:planned_count] += planned + failed
output[:failed_count] = output.fetch(:failed_count, 0) + failed
output[:failed_count] = output.fetch(:failed_count, 0) + failed
output[:pending_count] = output.fetch(:pending_count, 0) + planned
output[:success_count] ||= 0
end
Expand All @@ -129,20 +129,28 @@ def run_progress
end

def recalculate_counts
total = total_count
failed = sub_plans_count('state' => %w(paused stopped), 'result' => %w(error warning))
total = total_count
if output[:cancelled_timestamp]
cancelled_scheduled_plans = sub_plans_count_after(output[:cancelled_timestamp], { 'state' => %w(paused stopped), 'result' => %w(error warning) })

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this bulletproof enough? Can the following happen?

  1. I run a job, it gets scheduled
  2. At time T, I cancel it and that time becomes cancelled_timestamp
  3. Before the run is actually cancelled, at time T+1, an error occurs so the task becomes stopped/error
  4. At T+2, an already failed run gets cancelled and nothing happens
    => The run failed but is counted as cancelled

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it failed after marked as cancelled then I would assume it should be ok to count it as cancelled.

cancelled_unscheduled_plans = total_count - output[:planned_count]
cancelled = cancelled_unscheduled_plans + cancelled_scheduled_plans
else
cancelled = cancelled_scheduled_plans = 0
end
failed = sub_plans_count('state' => %w(paused stopped), 'result' => %w(error warning)) - cancelled_scheduled_plans
success = sub_plans_count('state' => 'stopped', 'result' => 'success')
output.update(:pending_count => total - failed - success,
:failed_count => failed - output.fetch(:resumed_count, 0),
:success_count => success)
output.update(:pending_count => total - failed - success - cancelled_scheduled_plans,
:failed_count => failed - output.fetch(:resumed_count, 0),
:success_count => success,
:cancelled_count => cancelled)
end

def counts_set?
output[:total_count] && output[:success_count] && output[:failed_count] && output[:pending_count]
end

def check_for_errors!
raise SubtaskFailedException.new("A sub task failed") if output[:failed_count] > 0
raise SubtaskFailedException.new("A sub task failed") if output[:failed_count] + output[:cancelled_count] > 0
end

# Helper for creating sub plans
Expand Down Expand Up @@ -173,6 +181,7 @@ def concurrency_limit_capacity
def cancel!(force = false)
# Count the not-yet-planned tasks as cancelled
output[:cancelled_count] = total_count - output[:planned_count]
output[:cancelled_timestamp] ||= Time.now.utc.iso8601 # time in UTC for comparison with UTC times in the database
on_planning_finished if output[:cancelled_count].positive?
# Pass the cancel event to running sub plans if they can be cancelled
sub_plans(:state => 'running').each { |sub_plan| sub_plan.cancel(force) if sub_plan.cancellable? }
Expand All @@ -198,7 +207,9 @@ def can_spawn_next_batch?
end

def remaining_count
total_count - output[:cancelled_count] - output[:planned_count]
return 0 if output[:cancelled_timestamp]

total_count - output[:planned_count]
end

private
Expand All @@ -216,5 +227,9 @@ def sub_plans(filter = {})
def sub_plans_count(filter = {})
world.persistence.find_execution_plan_counts(filters: sub_plan_filter.merge(filter))
end

def sub_plans_count_after(timestamp, filter = {})
world.persistence.find_execution_plan_counts_after(timestamp, { filters: sub_plan_filter.merge(filter) })
end
end
end
4 changes: 4 additions & 0 deletions lib/dynflow/persistence.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ def find_execution_plan_counts(options)
adapter.find_execution_plan_counts(options)
end

def find_execution_plan_counts_after(timestamp, options)
adapter.find_execution_plan_counts_after(timestamp, options)
end

def delete_execution_plans(filters, batch_size = 1000, enforce_backup_dir = nil)
backup_dir = enforce_backup_dir || current_backup_dir
adapter.delete_execution_plans(filters, batch_size, backup_dir)
Expand Down
4 changes: 4 additions & 0 deletions lib/dynflow/persistence_adapters/abstract.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ def find_execution_plan_counts(options = {})
filter(:execution_plan, options[:filters]).count
end

def find_execution_plan_counts_after(timestamp, options = {})
raise NotImplementedError
end

def find_execution_plan_statuses(options)
raise NotImplementedError
end
Expand Down
4 changes: 4 additions & 0 deletions lib/dynflow/persistence_adapters/sequel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ def find_execution_plan_counts(options = {})
filter(:execution_plan, table(:execution_plan), options[:filters]).count
end

def find_execution_plan_counts_after(timestamp, options = {})
filter(:execution_plan, table(:execution_plan), options[:filters]).filter(::Sequel.lit('ended_at >= ?', timestamp)).count
end

def find_execution_plan_statuses(options)
plans = filter(:execution_plan, table(:execution_plan), options[:filters])
.select(:uuid, :state, :result)
Expand Down
Loading