Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 37 additions & 26 deletions lib/cloud_controller/diego/tasks_sync.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,25 @@ def sync

diego_tasks = bbs_task_client.fetch_tasks.index_by(&:task_guid)

batched_cc_tasks do |cc_tasks|
tasks_to_fail = []
to_update = []
to_cancel = []

batched_cc_tasks do |cc_tasks|
cc_tasks.each do |cc_task|
diego_task = diego_tasks.delete(cc_task.guid)
next unless [TaskModel::RUNNING_STATE, TaskModel::CANCELING_STATE].include? cc_task.state

if diego_task.nil?
tasks_to_fail << cc_task.guid if diego_task_missing?(cc_task.guid) && !task_finished_while_iterating?(cc_task.guid)
logger.info('missing-diego-task', task_guid: cc_task.guid)
to_update << cc_task.guid
elsif cc_task.state == TaskModel::CANCELING_STATE
workpool.submit(cc_task.guid) do |guid|
bbs_task_client.cancel_task(guid)
logger.info('canceled-cc-task', task_guid: guid)
end
end
end

unless tasks_to_fail.empty?
TaskModel.where(guid: tasks_to_fail).each do |cc_task|
cc_task.update(state: TaskModel::FAILED_STATE, failure_reason: BULKER_TASK_FAILURE)
to_cancel << cc_task.guid
end
end
end

diego_tasks.each_key do |task_guid|
workpool.submit(task_guid) do |guid|
bbs_task_client.cancel_task(guid)
logger.info('missing-cc-task', task_guid: guid)
end
end
update_missing_diego_tasks(to_update)
cancel_cc_tasks(to_cancel)
cancel_missing_cc_tasks(diego_tasks)

workpool.drain

Expand Down Expand Up @@ -87,21 +75,44 @@ def formatted_backtrace_from_error(error)
error.backtrace.present? ? error.backtrace.join("\n") + "\n..." : ''
end

def diego_task_missing?(task_guid)
bbs_task_client.fetch_task(task_guid).nil?
def update_missing_diego_tasks(to_update)
to_update.each do |task_guid|
workpool.submit(task_guid) do |guid|
diego_task_missing = bbs_task_client.fetch_task(guid).nil?
if diego_task_missing
# Mark the CC task as failed. Don't update tasks that are already in a terminal state.
task = TaskModel.where(guid:).exclude(state: [TaskModel::FAILED_STATE, TaskModel::SUCCEEDED_STATE]).first
task&.update(state: TaskModel::FAILED_STATE, failure_reason: BULKER_TASK_FAILURE) # invoke model's update method to create an event
logger.info('missing-diego-task', task_guid: guid)
end
end
end
end

def cancel_cc_tasks(to_cancel)
to_cancel.each do |task_guid|
workpool.submit(task_guid) do |guid|
bbs_task_client.cancel_task(guid)
logger.info('canceled-cc-task', task_guid: guid)
end
end
end

def task_finished_while_iterating?(task_guid)
cc_task = TaskModel.find(guid: task_guid)
[TaskModel::FAILED_STATE, TaskModel::SUCCEEDED_STATE].include?(cc_task.state)
def cancel_missing_cc_tasks(to_cancel_missing)
to_cancel_missing.each_key do |task_guid|
workpool.submit(task_guid) do |guid|
bbs_task_client.cancel_task(guid)
logger.info('missing-cc-task', task_guid: guid)
end
end
end

def batched_cc_tasks
last_id = 0
loop do
tasks = TaskModel.where(
Sequel.lit('tasks.id > ?', last_id)
).order(:id).limit(BATCH_SIZE).all
).order(:id).limit(BATCH_SIZE).select(:id, :guid, :state).all

yield tasks
return if tasks.count < BATCH_SIZE
Expand Down
6 changes: 1 addition & 5 deletions spec/db_spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@
end

rspec_config.around do |example|
# DatabaseIsolation requires the api config context
TestConfig.context = :api
TestConfig.reset

isolation = DatabaseIsolation.choose(example.metadata[:isolation], TestConfig.config_instance, DbConfig.new.connection)
isolation = DatabaseIsolation.choose(example.metadata[:isolation], DbConfig.new.connection)
isolation.cleanly { example.run }
end
end
Expand Down
2 changes: 1 addition & 1 deletion spec/performance/packages_controller_index_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
require 'spec_helper'
require 'rails_helper'

RSpec.describe PackagesController, type: :controller do # , isolation: :truncation
RSpec.describe PackagesController, type: :controller do
describe '#index' do
let(:user) { set_current_user(VCAP::CloudController::User.make) }
let(:app_model) { VCAP::CloudController::AppModel.make }
Expand Down
20 changes: 8 additions & 12 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@
end

rspec_config.before do
Delayed::Worker.destroy_failed_jobs = false
Sequel::Deprecation.output = StringIO.new
Sequel::Deprecation.backtrace_filter = 5

TestConfig.context = example.metadata[:job_context] || :api
TestConfig.reset

Fog::Mock.reset

if Fog.mock?
Expand All @@ -175,13 +182,6 @@
CloudController::DependencyLocator.instance.buildpack_blobstore.ensure_bucket_exists
end

Delayed::Worker.destroy_failed_jobs = false
Sequel::Deprecation.output = StringIO.new
Sequel::Deprecation.backtrace_filter = 5

TestConfig.context = example.metadata[:job_context] || :api
TestConfig.reset

VCAP::CloudController::SecurityContext.clear
allow_any_instance_of(VCAP::CloudController::UaaTokenDecoder).to receive(:uaa_issuer).and_return(UAAIssuer::ISSUER)

Expand All @@ -190,11 +190,7 @@
end

rspec_config.around do |example|
# DatabaseIsolation requires the api config context
TestConfig.context = :api
TestConfig.reset

isolation = DatabaseIsolation.choose(example.metadata[:isolation], TestConfig.config_instance, DbConfig.new.connection)
isolation = DatabaseIsolation.choose(example.metadata[:isolation], DbConfig.new.connection)
isolation.cleanly { example.run }
end

Expand Down
12 changes: 7 additions & 5 deletions spec/support/database_isolation.rb
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
module DatabaseIsolation
def self.choose(isolation, config, db)
def self.choose(isolation, db)
case isolation
when :truncation
TruncateTables.new(config, db)
TruncateTables.new(db)
else
RollbackTransaction.new
end
end

class TruncateTables
def initialize(config, db)
@config = config
def initialize(db)
@db = db
end

Expand All @@ -24,7 +23,10 @@ def reset_tables
table_truncator = TableTruncator.new(db)
table_truncator.truncate_tables

VCAP::CloudController::Seeds.write_seed_data(config)
# VCAP::CloudController::Seeds requires the :api config
TestConfig.context = :api
TestConfig.reset
VCAP::CloudController::Seeds.write_seed_data(TestConfig.config_instance)
end

private
Expand Down
8 changes: 5 additions & 3 deletions spec/unit/lib/cloud_controller/diego/tasks_sync_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ module Diego
end
end

context 'when a running CC task is missing from BBS' do
context 'when a running CC task is missing from BBS', isolation: :truncation do
# Can't use transactions for isolation because we're using multiple threads
let!(:running_task) { TaskModel.make(:running, created_at: 1.minute.ago) }
let!(:canceling_task) { TaskModel.make(:canceling, created_at: 1.minute.ago) }
let!(:start_event_for_running_task) { AppUsageEvent.make(task_guid: running_task.guid, state: 'TASK_STARTED') }
Expand Down Expand Up @@ -270,7 +271,7 @@ module Diego
end
end

context 'when a new task is created after cc initally fetches tasks from bbs' do
context 'when a new task is created after cc initially fetches tasks from bbs' do
context 'and the newly started task does not complete before checking to see if it should fail' do
let!(:cc_task) { TaskModel.make(guid: 'some-task-guid', state: TaskModel::RUNNING_STATE) }
let(:bbs_task) { ::Diego::Bbs::Models::Task.new(task_guid: 'some-task-guid', state: ::Diego::Bbs::Models::Task::State::Running) }
Expand Down Expand Up @@ -298,7 +299,8 @@ module Diego
end
end

context 'and the newly started task completes before the iteration completes' do
context 'and the newly started task completes before the iteration completes', isolation: :truncation do
# Can't use transactions for isolation because we're using multiple threads
let!(:cc_task) { TaskModel.make(guid: 'some-task-guid', state: TaskModel::RUNNING_STATE) }
let(:bbs_tasks) { [] }

Expand Down