Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 63 additions & 45 deletions lib/dynflow/persistence_adapters/sequel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,23 @@ def find_execution_plans(options = {})
paginate(table(table_name), options),
options),
options[:filters])
data_set.all.map { |record| execution_plan_column_map(load_data(record, table_name)) }
records = with_retry { data_set.all }
records.map { |record| execution_plan_column_map(load_data(record, table_name)) }
end

def find_execution_plan_counts(options = {})
filter(:execution_plan, table(:execution_plan), options[:filters]).count
with_retry { filter(:execution_plan, table(:execution_plan), options[:filters]).count }
end

def find_execution_plan_counts_after(timestamp, options = {})
filter(:execution_plan, table(:execution_plan), options[:filters]).filter(::Sequel.lit('ended_at >= ?', timestamp)).count
with_retry { filter(:execution_plan, table(:execution_plan), options[:filters]).filter(::Sequel.lit('ended_at >= ?', timestamp)).count }
end

def find_execution_plan_statuses(options)
plans = filter(:execution_plan, table(:execution_plan), options[:filters])
.select(:uuid, :state, :result)
plans = with_retry do
filter(:execution_plan, table(:execution_plan), options[:filters])
.select(:uuid, :state, :result)
end

plans.each_with_object({}) do |current, acc|
uuid = current.delete(:uuid)
Expand All @@ -94,27 +97,29 @@ def find_execution_plan_statuses(options)

def delete_execution_plans(filters, batch_size = 1000, backup_dir = nil)
count = 0
filter(:execution_plan, table(:execution_plan), filters).each_slice(batch_size) do |plans|
uuids = plans.map { |p| p.fetch(:uuid) }
@db.transaction do
table(:delayed).where(execution_plan_uuid: uuids).delete
with_retry do
filter(:execution_plan, table(:execution_plan), filters).each_slice(batch_size) do |plans|
uuids = plans.map { |p| p.fetch(:uuid) }
@db.transaction do
table(:delayed).where(execution_plan_uuid: uuids).delete

steps = table(:step).where(execution_plan_uuid: uuids)
backup_to_csv(:step, steps, backup_dir, 'steps.csv') if backup_dir
steps.delete
steps = table(:step).where(execution_plan_uuid: uuids)
backup_to_csv(:step, steps, backup_dir, 'steps.csv') if backup_dir
steps.delete

table(:output_chunk).where(execution_plan_uuid: uuids).delete
output_chunks = table(:output_chunk).where(execution_plan_uuid: uuids).delete

actions = table(:action).where(execution_plan_uuid: uuids)
backup_to_csv(:action, actions, backup_dir, 'actions.csv') if backup_dir
actions.delete
actions = table(:action).where(execution_plan_uuid: uuids)
backup_to_csv(:action, actions, backup_dir, 'actions.csv') if backup_dir
actions.delete

execution_plans = table(:execution_plan).where(uuid: uuids)
backup_to_csv(:execution_plan, execution_plans, backup_dir, 'execution_plans.csv') if backup_dir
count += execution_plans.delete
execution_plans = table(:execution_plan).where(uuid: uuids)
backup_to_csv(:execution_plan, execution_plans, backup_dir, 'execution_plans.csv') if backup_dir
count += execution_plans.delete
end
end
return count
end
return count
end

def load_execution_plan(execution_plan_id)
Expand All @@ -127,30 +132,37 @@ def save_execution_plan(execution_plan_id, value)

def delete_delayed_plans(filters, batch_size = 1000)
count = 0
filter(:delayed, table(:delayed), filters).each_slice(batch_size) do |plans|
uuids = plans.map { |p| p.fetch(:execution_plan_uuid) }
@db.transaction do
count += table(:delayed).where(execution_plan_uuid: uuids).delete
with_retry do
filter(:delayed, table(:delayed), filters).each_slice(batch_size) do |plans|
uuids = plans.map { |p| p.fetch(:execution_plan_uuid) }
@db.transaction do
count += table(:delayed).where(execution_plan_uuid: uuids).delete
end
end
end
count
end

def find_old_execution_plans(age)
table_name = :execution_plan
table(table_name)
.where(::Sequel.lit('ended_at <= ? AND state = ?', age, 'stopped'))
.all.map { |plan| execution_plan_column_map(load_data plan, table_name) }
records = with_retry do
table(table_name)
.where(::Sequel.lit('ended_at <= ? AND state = ?', age, 'stopped'))
.all
end
records.map { |plan| execution_plan_column_map(load_data plan, table_name) }
end

def find_past_delayed_plans(time)
table_name = :delayed
table(table_name)
.where(::Sequel.lit('start_at <= ? OR (start_before IS NOT NULL AND start_before <= ?)', time, time))
.where(:frozen => false)
.order_by(:start_at)
.all
.map { |plan| load_data(plan, table_name) }
records = with_retry do
table(table_name)
.where(::Sequel.lit('start_at <= ? OR (start_before IS NOT NULL AND start_before <= ?)', time, time))
.where(:frozen => false)
.order_by(:start_at)
.all
end
records.map { |plan| load_data(plan, table_name) }
end

def load_delayed_plan(execution_plan_id)
Expand Down Expand Up @@ -205,7 +217,9 @@ def load_output_chunks(execution_plan_id, action_id)
end

def delete_output_chunks(execution_plan_id, action_id)
filter(:output_chunk, table(:output_chunk), { execution_plan_uuid: execution_plan_id, action_id: action_id }).delete
with_retry do
filter(:output_chunk, table(:output_chunk), { execution_plan_uuid: execution_plan_id, action_id: action_id }).delete
end
end

def connector_feature!
Expand All @@ -221,28 +235,30 @@ def save_envelope(data)

def pull_envelopes(receiver_id)
connector_feature!
db.transaction do
data_set = table(:envelope).where(receiver_id: receiver_id).all
envelopes = data_set.map { |record| load_data(record) }
with_retry do
db.transaction do
data_set = table(:envelope).where(receiver_id: receiver_id).all
envelopes = data_set.map { |record| load_data(record) }

table(:envelope).where(id: data_set.map { |d| d[:id] }).delete
return envelopes
table(:envelope).where(id: data_set.map { |d| d[:id] }).delete
return envelopes
end
end
end

def push_envelope(envelope)
connector_feature!
table(:envelope).insert(prepare_record(:envelope, envelope))
with_retry { table(:envelope).insert(prepare_record(:envelope, envelope)) }
end

def prune_envelopes(receiver_ids)
connector_feature!
table(:envelope).where(receiver_id: receiver_ids).delete
with_retry { table(:envelope).where(receiver_id: receiver_ids).delete }
end

def prune_undeliverable_envelopes
connector_feature!
table(:envelope).where(receiver_id: table(:coordinator_record).select(:id)).invert.delete
with_retry { table(:envelope).where(receiver_id: table(:coordinator_record).select(:id)).invert.delete }
end

def coordinator_feature!
Expand All @@ -263,7 +279,7 @@ def update_coordinator_record(class_name, record_id, value)

def delete_coordinator_record(class_name, record_id)
coordinator_feature!
table(:coordinator_record).where(class: class_name, id: record_id).delete
with_retry { table(:coordinator_record).where(class: class_name, id: record_id).delete }
end

def find_coordinator_records(options)
Expand All @@ -275,7 +291,9 @@ def find_coordinator_records(options)
if exclude_owner_id
data_set = data_set.exclude(:owner_id => exclude_owner_id)
end
data_set.all.map { |record| load_data(record) }
with_retry do
data_set.all.map { |record| load_data(record) }
end
end

def to_hash
Expand Down Expand Up @@ -444,7 +462,7 @@ def backup_to_csv(table_name, dataset, backup_dir, file_name)
end

def delete(what, condition)
table(what).where(Utils.symbolize_keys(condition)).delete
with_retry { table(what).where(Utils.symbolize_keys(condition)).delete }
end

def extract_metadata(what, value)
Expand Down
37 changes: 24 additions & 13 deletions lib/dynflow/world.rb
Original file line number Diff line number Diff line change
Expand Up @@ -325,17 +325,7 @@ def start_termination
logger.info "start terminating throttle_limiter..."
throttle_limiter.terminate.wait(termination_timeout)

if executor
connector.stop_receiving_new_work(self, termination_timeout)

logger.info "start terminating executor..."
executor.terminate.wait(termination_timeout)

logger.info "start terminating executor dispatcher..."
executor_dispatcher_terminated = Concurrent::Promises.resolvable_future
executor_dispatcher.ask([:start_termination, executor_dispatcher_terminated])
executor_dispatcher_terminated.wait(termination_timeout)
end
terminate_executor

logger.info "start terminating client dispatcher..."
client_dispatcher_terminated = Concurrent::Promises.resolvable_future
Expand All @@ -350,7 +340,11 @@ def start_termination
clock.ask(:terminate!).wait(termination_timeout)
end

coordinator.delete_world(registered_world, true)
begin
coordinator.delete_world(registered_world, true)
rescue Dynflow::Errors::FatalPersistenceError => e
nil
end
@terminated.resolve
true
rescue => e
Expand All @@ -361,7 +355,10 @@ def start_termination
termination_future.wait(termination_timeout)
end.on_resolution do
@terminated.resolve
Thread.new { Kernel.exit } if @exit_on_terminate.true?
Thread.new do
logger.info 'World terminated, exiting.'
Kernel.exit if @exit_on_terminate.true?
end
end
end
end
Expand Down Expand Up @@ -395,6 +392,20 @@ def spawn_and_wait(klass, name, *args)
initialized.wait
return actor
end

def terminate_executor
return unless executor

connector.stop_receiving_new_work(self, termination_timeout)

logger.info "start terminating executor..."
executor.terminate.wait(termination_timeout)

logger.info "start terminating executor dispatcher..."
executor_dispatcher_terminated = Concurrent::Promises.resolvable_future
executor_dispatcher.ask([:start_termination, executor_dispatcher_terminated])
executor_dispatcher_terminated.wait(termination_timeout)
end
end
# rubocop:enable Metrics/ClassLength
end
31 changes: 31 additions & 0 deletions test/bats/sidekiq-orchestrator.bats
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,34 @@ teardown() {
kill -15 "$(cat "$TEST_PIDDIR/o1.pid")"
wait_for 120 1 grep 'dynflow: Acquired orchestrator lock, entering active mode.' "$(bg_output_file o2)"
}

@test "active orchestrator exits when pg goes away for good" {
cd "$(get_project_root)"

run_background 'o1' bundle exec sidekiq -r ./examples/remote_executor.rb -q dynflow_orchestrator -c 1
wait_for 30 1 grep 'dynflow: Acquired orchestrator lock, entering active mode.' "$(bg_output_file o1)"

run_background 'w1' bundle exec sidekiq -r ./examples/remote_executor.rb -q default
wait_for 5 1 grep 'dynflow: Finished performing validity checks' "$(bg_output_file o1)"

podman stop "$POSTGRES_CONTAINER_NAME"
wait_for 60 1 grep 'dynflow: World terminated, exiting.' "$(bg_output_file o1)"
}

@test "active orchestrator can withstand temporary pg connection drop" {
cd "$(get_project_root)"

run_background 'o1' bundle exec sidekiq -r ./examples/remote_executor.rb -q dynflow_orchestrator -c 1
wait_for 30 1 grep 'dynflow: Acquired orchestrator lock, entering active mode.' "$(bg_output_file o1)"

run_background 'w1' bundle exec sidekiq -r ./examples/remote_executor.rb -q default
wait_for 5 1 grep 'dynflow: Finished performing validity checks' "$(bg_output_file o1)"

podman stop "$POSTGRES_CONTAINER_NAME"
wait_for 30 1 grep 'dynflow: Persistence retry no. 1' "$(bg_output_file o1)"
podman start "$POSTGRES_CONTAINER_NAME"
wait_for 30 1 grep 'dynflow: Executor heartbeat' "$(bg_output_file o1)"

timeout 30 bundle exec ruby examples/remote_executor.rb client 1
wait_for 1 1 grep -P 'dynflow: ExecutionPlan.*running >>.*stopped' "$(bg_output_file o1)"
}