diff --git a/lib/dynflow/persistence_adapters/sequel.rb b/lib/dynflow/persistence_adapters/sequel.rb index 94d05bb5..2c321f85 100644 --- a/lib/dynflow/persistence_adapters/sequel.rb +++ b/lib/dynflow/persistence_adapters/sequel.rb @@ -71,20 +71,23 @@ def find_execution_plans(options = {}) paginate(table(table_name), options), options), options[:filters]) - data_set.all.map { |record| execution_plan_column_map(load_data(record, table_name)) } + records = with_retry { data_set.all } + records.map { |record| execution_plan_column_map(load_data(record, table_name)) } end def find_execution_plan_counts(options = {}) - filter(:execution_plan, table(:execution_plan), options[:filters]).count + with_retry { filter(:execution_plan, table(:execution_plan), options[:filters]).count } end def find_execution_plan_counts_after(timestamp, options = {}) - filter(:execution_plan, table(:execution_plan), options[:filters]).filter(::Sequel.lit('ended_at >= ?', timestamp)).count + with_retry { filter(:execution_plan, table(:execution_plan), options[:filters]).filter(::Sequel.lit('ended_at >= ?', timestamp)).count } end def find_execution_plan_statuses(options) - plans = filter(:execution_plan, table(:execution_plan), options[:filters]) - .select(:uuid, :state, :result) + plans = with_retry do + filter(:execution_plan, table(:execution_plan), options[:filters]) + .select(:uuid, :state, :result) + end plans.each_with_object({}) do |current, acc| uuid = current.delete(:uuid) @@ -94,27 +97,29 @@ def find_execution_plan_statuses(options) def delete_execution_plans(filters, batch_size = 1000, backup_dir = nil) count = 0 - filter(:execution_plan, table(:execution_plan), filters).each_slice(batch_size) do |plans| - uuids = plans.map { |p| p.fetch(:uuid) } - @db.transaction do - table(:delayed).where(execution_plan_uuid: uuids).delete + with_retry do + filter(:execution_plan, table(:execution_plan), filters).each_slice(batch_size) do |plans| + uuids = plans.map { |p| p.fetch(:uuid) } + @db.transaction do + table(:delayed).where(execution_plan_uuid: uuids).delete - steps = table(:step).where(execution_plan_uuid: uuids) - backup_to_csv(:step, steps, backup_dir, 'steps.csv') if backup_dir - steps.delete + steps = table(:step).where(execution_plan_uuid: uuids) + backup_to_csv(:step, steps, backup_dir, 'steps.csv') if backup_dir + steps.delete - table(:output_chunk).where(execution_plan_uuid: uuids).delete + output_chunks = table(:output_chunk).where(execution_plan_uuid: uuids).delete - actions = table(:action).where(execution_plan_uuid: uuids) - backup_to_csv(:action, actions, backup_dir, 'actions.csv') if backup_dir - actions.delete + actions = table(:action).where(execution_plan_uuid: uuids) + backup_to_csv(:action, actions, backup_dir, 'actions.csv') if backup_dir + actions.delete - execution_plans = table(:execution_plan).where(uuid: uuids) - backup_to_csv(:execution_plan, execution_plans, backup_dir, 'execution_plans.csv') if backup_dir - count += execution_plans.delete + execution_plans = table(:execution_plan).where(uuid: uuids) + backup_to_csv(:execution_plan, execution_plans, backup_dir, 'execution_plans.csv') if backup_dir + count += execution_plans.delete + end end + return count end - return count end def load_execution_plan(execution_plan_id) @@ -127,10 +132,12 @@ def save_execution_plan(execution_plan_id, value) def delete_delayed_plans(filters, batch_size = 1000) count = 0 - filter(:delayed, table(:delayed), filters).each_slice(batch_size) do |plans| - uuids = plans.map { |p| p.fetch(:execution_plan_uuid) } - @db.transaction do - count += table(:delayed).where(execution_plan_uuid: uuids).delete + with_retry do + filter(:delayed, table(:delayed), filters).each_slice(batch_size) do |plans| + uuids = plans.map { |p| p.fetch(:execution_plan_uuid) } + @db.transaction do + count += table(:delayed).where(execution_plan_uuid: uuids).delete + end end end count @@ -138,19 +145,24 @@ def delete_delayed_plans(filters, batch_size = 1000) def find_old_execution_plans(age) table_name = :execution_plan - table(table_name) - .where(::Sequel.lit('ended_at <= ? AND state = ?', age, 'stopped')) - .all.map { |plan| execution_plan_column_map(load_data plan, table_name) } + records = with_retry do + table(table_name) + .where(::Sequel.lit('ended_at <= ? AND state = ?', age, 'stopped')) + .all + end + records.map { |plan| execution_plan_column_map(load_data plan, table_name) } end def find_past_delayed_plans(time) table_name = :delayed - table(table_name) - .where(::Sequel.lit('start_at <= ? OR (start_before IS NOT NULL AND start_before <= ?)', time, time)) - .where(:frozen => false) - .order_by(:start_at) - .all - .map { |plan| load_data(plan, table_name) } + records = with_retry do + table(table_name) + .where(::Sequel.lit('start_at <= ? OR (start_before IS NOT NULL AND start_before <= ?)', time, time)) + .where(:frozen => false) + .order_by(:start_at) + .all + end + records.map { |plan| load_data(plan, table_name) } end def load_delayed_plan(execution_plan_id) @@ -205,7 +217,9 @@ def load_output_chunks(execution_plan_id, action_id) end def delete_output_chunks(execution_plan_id, action_id) - filter(:output_chunk, table(:output_chunk), { execution_plan_uuid: execution_plan_id, action_id: action_id }).delete + with_retry do + filter(:output_chunk, table(:output_chunk), { execution_plan_uuid: execution_plan_id, action_id: action_id }).delete + end end def connector_feature! @@ -221,28 +235,30 @@ def save_envelope(data) def pull_envelopes(receiver_id) connector_feature! - db.transaction do - data_set = table(:envelope).where(receiver_id: receiver_id).all - envelopes = data_set.map { |record| load_data(record) } + with_retry do + db.transaction do + data_set = table(:envelope).where(receiver_id: receiver_id).all + envelopes = data_set.map { |record| load_data(record) } - table(:envelope).where(id: data_set.map { |d| d[:id] }).delete - return envelopes + table(:envelope).where(id: data_set.map { |d| d[:id] }).delete + return envelopes + end end end def push_envelope(envelope) connector_feature! - table(:envelope).insert(prepare_record(:envelope, envelope)) + with_retry { table(:envelope).insert(prepare_record(:envelope, envelope)) } end def prune_envelopes(receiver_ids) connector_feature! - table(:envelope).where(receiver_id: receiver_ids).delete + with_retry { table(:envelope).where(receiver_id: receiver_ids).delete } end def prune_undeliverable_envelopes connector_feature! - table(:envelope).where(receiver_id: table(:coordinator_record).select(:id)).invert.delete + with_retry { table(:envelope).where(receiver_id: table(:coordinator_record).select(:id)).invert.delete } end def coordinator_feature! @@ -263,7 +279,7 @@ def update_coordinator_record(class_name, record_id, value) def delete_coordinator_record(class_name, record_id) coordinator_feature! - table(:coordinator_record).where(class: class_name, id: record_id).delete + with_retry { table(:coordinator_record).where(class: class_name, id: record_id).delete } end def find_coordinator_records(options) @@ -275,7 +291,9 @@ def find_coordinator_records(options) if exclude_owner_id data_set = data_set.exclude(:owner_id => exclude_owner_id) end - data_set.all.map { |record| load_data(record) } + with_retry do + data_set.all.map { |record| load_data(record) } + end end def to_hash @@ -444,7 +462,7 @@ def backup_to_csv(table_name, dataset, backup_dir, file_name) end def delete(what, condition) - table(what).where(Utils.symbolize_keys(condition)).delete + with_retry { table(what).where(Utils.symbolize_keys(condition)).delete } end def extract_metadata(what, value) diff --git a/lib/dynflow/world.rb b/lib/dynflow/world.rb index 0f3b5f07..8d7433db 100644 --- a/lib/dynflow/world.rb +++ b/lib/dynflow/world.rb @@ -325,17 +325,7 @@ def start_termination logger.info "start terminating throttle_limiter..." throttle_limiter.terminate.wait(termination_timeout) - if executor - connector.stop_receiving_new_work(self, termination_timeout) - - logger.info "start terminating executor..." - executor.terminate.wait(termination_timeout) - - logger.info "start terminating executor dispatcher..." - executor_dispatcher_terminated = Concurrent::Promises.resolvable_future - executor_dispatcher.ask([:start_termination, executor_dispatcher_terminated]) - executor_dispatcher_terminated.wait(termination_timeout) - end + terminate_executor logger.info "start terminating client dispatcher..." client_dispatcher_terminated = Concurrent::Promises.resolvable_future @@ -350,7 +340,11 @@ def start_termination clock.ask(:terminate!).wait(termination_timeout) end - coordinator.delete_world(registered_world, true) + begin + coordinator.delete_world(registered_world, true) + rescue Dynflow::Errors::FatalPersistenceError => e + nil + end @terminated.resolve true rescue => e @@ -361,7 +355,10 @@ def start_termination termination_future.wait(termination_timeout) end.on_resolution do @terminated.resolve - Thread.new { Kernel.exit } if @exit_on_terminate.true? + Thread.new do + logger.info 'World terminated, exiting.' + Kernel.exit if @exit_on_terminate.true? + end end end end @@ -395,6 +392,20 @@ def spawn_and_wait(klass, name, *args) initialized.wait return actor end + + def terminate_executor + return unless executor + + connector.stop_receiving_new_work(self, termination_timeout) + + logger.info "start terminating executor..." + executor.terminate.wait(termination_timeout) + + logger.info "start terminating executor dispatcher..." + executor_dispatcher_terminated = Concurrent::Promises.resolvable_future + executor_dispatcher.ask([:start_termination, executor_dispatcher_terminated]) + executor_dispatcher_terminated.wait(termination_timeout) + end end # rubocop:enable Metrics/ClassLength end diff --git a/test/bats/sidekiq-orchestrator.bats b/test/bats/sidekiq-orchestrator.bats index 1bba027c..13e1d716 100644 --- a/test/bats/sidekiq-orchestrator.bats +++ b/test/bats/sidekiq-orchestrator.bats @@ -85,3 +85,34 @@ teardown() { kill -15 "$(cat "$TEST_PIDDIR/o1.pid")" wait_for 120 1 grep 'dynflow: Acquired orchestrator lock, entering active mode.' "$(bg_output_file o2)" } + +@test "active orchestrator exits when pg goes away for good" { + cd "$(get_project_root)" + + run_background 'o1' bundle exec sidekiq -r ./examples/remote_executor.rb -q dynflow_orchestrator -c 1 + wait_for 30 1 grep 'dynflow: Acquired orchestrator lock, entering active mode.' "$(bg_output_file o1)" + + run_background 'w1' bundle exec sidekiq -r ./examples/remote_executor.rb -q default + wait_for 5 1 grep 'dynflow: Finished performing validity checks' "$(bg_output_file o1)" + + podman stop "$POSTGRES_CONTAINER_NAME" + wait_for 60 1 grep 'dynflow: World terminated, exiting.' "$(bg_output_file o1)" +} + +@test "active orchestrator can withstand temporary pg connection drop" { + cd "$(get_project_root)" + + run_background 'o1' bundle exec sidekiq -r ./examples/remote_executor.rb -q dynflow_orchestrator -c 1 + wait_for 30 1 grep 'dynflow: Acquired orchestrator lock, entering active mode.' "$(bg_output_file o1)" + + run_background 'w1' bundle exec sidekiq -r ./examples/remote_executor.rb -q default + wait_for 5 1 grep 'dynflow: Finished performing validity checks' "$(bg_output_file o1)" + + podman stop "$POSTGRES_CONTAINER_NAME" + wait_for 30 1 grep 'dynflow: Persistence retry no. 1' "$(bg_output_file o1)" + podman start "$POSTGRES_CONTAINER_NAME" + wait_for 30 1 grep 'dynflow: Executor heartbeat' "$(bg_output_file o1)" + + timeout 30 bundle exec ruby examples/remote_executor.rb client 1 + wait_for 1 1 grep -P 'dynflow: ExecutionPlan.*running >>.*stopped' "$(bg_output_file o1)" +}