From c839d900f75a2f6ad47f9b654e7660e8527a9b4c Mon Sep 17 00:00:00 2001 From: Adam Ruzicka Date: Thu, 12 Aug 2021 13:31:44 +0200 Subject: [PATCH 01/12] Fixes #33682 - Groundwork for pull provider --- lib/smart_proxy_remote_execution_ssh.rb | 4 + .../actions/pull_script.rb | 87 +++++++++++++++++++ lib/smart_proxy_remote_execution_ssh/api.rb | 38 ++++++++ .../plugin.rb | 1 + smart_proxy_remote_execution_ssh.gemspec | 1 + test/api_test.rb | 67 ++++++++++++++ 6 files changed, 198 insertions(+) create mode 100644 lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb diff --git a/lib/smart_proxy_remote_execution_ssh.rb b/lib/smart_proxy_remote_execution_ssh.rb index e430863..9c33fdf 100644 --- a/lib/smart_proxy_remote_execution_ssh.rb +++ b/lib/smart_proxy_remote_execution_ssh.rb @@ -51,6 +51,10 @@ def validate_ssh_log_level! Plugin.settings.ssh_log_level = Plugin.settings.ssh_log_level.to_sym end + + def job_storage + @job_storage ||= Proxy::MemoryStore.new + end end end end diff --git a/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb b/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb new file mode 100644 index 0000000..44c6b84 --- /dev/null +++ b/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb @@ -0,0 +1,87 @@ +require 'mqtt' +require 'json' + +module Proxy::RemoteExecution::Ssh + class PullScript < Proxy::Dynflow::Action::Runner + JobDelivered = Class.new + + execution_plan_hooks.use :cleanup, :on => :stopped + + def plan(action_input, mqtt: false) + super(action_input) + input[:with_mqtt] = mqtt + end + + def run(event = nil) + if event == JobDelivered + output[:state] = :delivered + suspend + else + super + end + end + + def init_run + Proxy::RemoteExecution::Ssh::Plugin.job_storage["#{input[:hostname]}-#{execution_plan_id}", run_step_id, 'script.sh'] = input[:script] + output[:state] = :ready_for_pickup + mqtt_start if input[:with_mqtt] + suspend + end + + def cleanup(_plan = nil) + Proxy::RemoteExecution::Ssh::Plugin.job_storage.delete("#{input[:hostname]}-#{execution_plan_id}") + end + + def process_external_event(event) + output[:state] = :running + data = event.data + continuous_output = Proxy::Dynflow::ContinuousOutput.new + continuous_output.add_output(lines, 'stdout') if data.key?('output') + exit_code = data['exit_code'].to_i if data['exit_code'] + process_update(Proxy::Dynflow::Runner::Update.new(continuous_output, exit_code)) + end + + def kill_run + case output[:state] + when :ready_for_pickup + # If the job is not running yet on the client, wipe it from storage + cleanup + # TODO: Stop the action + when :notified, :running + # Client was notified or is already running, dealing with this situation + # is only supported if mqtt is available + # Otherwise we have to wait it out + if input[:with_mqtt] + cleanup + payload = {} # TODO + mqtt_notify payload + end + end + suspend + end + + def mqtt_start + payload = { + type: 'data', + message_id: SecureRandom.uuid, + version: 1, + sent: DateTime.now.iso8601, + directive: 'foreman', + metadata: { + 'return_url': "#{input[:proxy_url]}/job/#{execution_plan_id}/#{run_step_id}/update" + }, + content: "#{input[:proxy_url]}/job/store/#{execution_plan_id}/#{run_step_id}/script.sh", + } + mqtt_notify payload + output[:state] = :notified + end + + def mqtt_notify(payload) + broker = 'localhost' # TODO + broker_port = 1883 # TODO + MQTT::Client.connect(broker, broker_port) do |c| + c.publish("yggdrasil/#{input[:hostname]}/data/in", JSON.dump(payload), false, 1) + end + end + end +end diff --git a/lib/smart_proxy_remote_execution_ssh/api.rb b/lib/smart_proxy_remote_execution_ssh/api.rb index 52f82bc..01787fa 100644 --- a/lib/smart_proxy_remote_execution_ssh/api.rb +++ b/lib/smart_proxy_remote_execution_ssh/api.rb @@ -1,5 +1,6 @@ require 'net/ssh' require 'base64' +require 'smart_proxy_dynflow/runner' module Proxy::RemoteExecution module Ssh @@ -37,6 +38,43 @@ class Api < ::Sinatra::Base end 204 end + + # Payload is a hash where + # exit_code: Integer | NilClass + # output: String + post '/job/:task_id/:step_id/update' do |task_id, step_id| + do_authorize_with_ssl_client + + path = job_path(https_cert_cn, task_id, nil, nil).first + if Proxy::RemoteExecution::Ssh.job_storage[path].nil? + status 404 + return '' + end + + data = MultiJson.load(request.body.read) + world.event(task_id, step_id, ::Proxy::Dynflow::Runner::ExternalEvent.new(data)) + end + + get "/job/store/:task_id/:step_id/:file" do |task_id, step_id, file| + do_authorize_with_ssl_client + + path = job_path(https_cert_cn, task_id, step_id.to_i, file) + content = Proxy::RemoteExecution::Ssh.job_storage[*path] + if content + world.event(task_id, step_id.to_i, Proxy::RemoteExecution::Ssh::PullScript::JobDelivered) + return content + end + + status 404 + '' + end + + def job_path(hostname, task_id, step_id, file) + ["#{hostname}-#{task_id}", + step_id, + file, + ] + end end end end diff --git a/lib/smart_proxy_remote_execution_ssh/plugin.rb b/lib/smart_proxy_remote_execution_ssh/plugin.rb index 7e5d182..2bf643b 100644 --- a/lib/smart_proxy_remote_execution_ssh/plugin.rb +++ b/lib/smart_proxy_remote_execution_ssh/plugin.rb @@ -24,6 +24,7 @@ class Plugin < Proxy::Plugin require 'smart_proxy_remote_execution_ssh/cockpit' require 'smart_proxy_remote_execution_ssh/api' require 'smart_proxy_remote_execution_ssh/actions/run_script' + require 'smart_proxy_remote_execution_ssh/actions/pull_script' require 'smart_proxy_remote_execution_ssh/dispatcher' require 'smart_proxy_remote_execution_ssh/log_filter' require 'smart_proxy_remote_execution_ssh/runners' diff --git a/smart_proxy_remote_execution_ssh.gemspec b/smart_proxy_remote_execution_ssh.gemspec index dda2845..04947df 100644 --- a/smart_proxy_remote_execution_ssh.gemspec +++ b/smart_proxy_remote_execution_ssh.gemspec @@ -31,4 +31,5 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency('smart_proxy_dynflow', '~> 0.5') gem.add_runtime_dependency('net-ssh', '>= 4.2.0') + gem.add_runtime_dependency('mqtt') end diff --git a/test/api_test.rb b/test/api_test.rb index 81c3d6d..e4a4aa7 100644 --- a/test/api_test.rb +++ b/test/api_test.rb @@ -1,5 +1,6 @@ require 'test_helper' require 'tempfile' +require 'smart_proxy_remote_execution_ssh/actions/pull_script' KNOWN_HOSTS = < 1 + _(last_response.status).must_equal 403 + end + + it 'returns 404 if job does not exist' do + Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:https_cert_cn).returns(hostname) + post '/job/12346/1/update' + _(last_response.status).must_equal 404 + end + + it 'dispatches an event' do + Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:https_cert_cn).returns(hostname) + fake_world = mock + fake_world.expects(:event) do |task_id, step_id, _payload| + task_id == '12345' && step_id == 1 + end + Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:world).returns(fake_world) + + post '/job/12345/1/update', '{}' + _(last_response.status).must_equal 200 + end + end + + describe '/job/store' do + let(:hostname) { 'myhost.example.com' } + + before { Proxy::RemoteExecution::Ssh.job_storage["#{hostname}-12345", 1, 'message'] = 'hello' } + after { Proxy::RemoteExecution::Ssh.job_storage.delete("#{hostname}-12345") } + + it 'returns 403 if HTTPS is used and no cert is provided' do + get '/job/store/12345/1/message', {}, 'HTTPS' => 1 + _(last_response.status).must_equal 403 + end + + it 'returns content if there is some and notifies the action' do + Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:https_cert_cn).returns(hostname) + fake_world = mock + fake_world.expects(:event).with('12345', 1, Proxy::RemoteExecution::Ssh::PullScript::JobDelivered) + Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:world).returns(fake_world) + + get '/job/store/12345/1/message' + _(last_response.status).must_equal 200 + _(last_response.body).must_equal 'hello' + end + + it 'returns 404 if there is no content' do + Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:https_cert_cn).times(3).returns(hostname) + + get '/job/store/12345/1/something.tar.gz' + _(last_response.status).must_equal 404 + + get '/job/store/12345/2/something.tar.gz' + _(last_response.status).must_equal 404 + + get '/job/store/12346/2/something.tar.gz' + _(last_response.status).must_equal 404 + end + end end end From 54efb6b0df69a8ca8974e1b991e3cf151a5d7deb Mon Sep 17 00:00:00 2001 From: Adam Ruzicka Date: Mon, 16 Aug 2021 14:44:48 +0200 Subject: [PATCH 02/12] Fixes #33682 - Make it configurable --- lib/smart_proxy_remote_execution_ssh.rb | 17 ++++++++++++++++ .../actions.rb | 6 ++++++ .../actions/pull_script.rb | 20 ++++++++++++------- .../actions/run_script.rb | 15 +++++++++++++- .../plugin.rb | 17 +++++++++++----- settings.d/remote_execution_ssh.yml.example | 8 +++++++- 6 files changed, 69 insertions(+), 14 deletions(-) create mode 100644 lib/smart_proxy_remote_execution_ssh/actions.rb diff --git a/lib/smart_proxy_remote_execution_ssh.rb b/lib/smart_proxy_remote_execution_ssh.rb index 9c33fdf..06a3fc4 100644 --- a/lib/smart_proxy_remote_execution_ssh.rb +++ b/lib/smart_proxy_remote_execution_ssh.rb @@ -20,7 +20,9 @@ def validate! raise "Ssh public key file #{public_key_file} doesn't exist" end + validate_mode! validate_ssh_log_level! + validate_mqtt_settings! end def private_key_file @@ -31,6 +33,21 @@ def public_key_file File.expand_path("#{private_key_file}.pub") end + def validate_mode! + Plugin.settings.mode = Plugin.settings.mode.to_sym + + unless Plugin::MODES.include? Plugin.settings.mode + raise "Mode has to be one of #{Plugin::MODES.join(', ')}, given #{Plugin.settings.mode}" + end + end + + def validate_mqtt_settings! + return unless Plugin.settings.mode == :'pull-mqtt' + + raise 'mqtt_broker has to be set when pull-mqtt mode is used' if Plugin.settings.mqtt_broker.nil? + raise 'mqtt_port has to be set when pull-mqtt mode is used' if Plugin.settings.mqtt_port.nil? + end + def validate_ssh_log_level! wanted_level = Plugin.settings.ssh_log_level.to_s levels = Plugin::SSH_LOG_LEVELS diff --git a/lib/smart_proxy_remote_execution_ssh/actions.rb b/lib/smart_proxy_remote_execution_ssh/actions.rb new file mode 100644 index 0000000..1ec6cd6 --- /dev/null +++ b/lib/smart_proxy_remote_execution_ssh/actions.rb @@ -0,0 +1,6 @@ +module Proxy::RemoteExecution::Ssh + module Actions + require 'smart_proxy_remote_execution_ssh/actions/run_script' + require 'smart_proxy_remote_execution_ssh/actions/pull_script' + end +end diff --git a/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb b/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb index 44c6b84..8a6f78f 100644 --- a/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb +++ b/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb @@ -1,7 +1,7 @@ require 'mqtt' require 'json' -module Proxy::RemoteExecution::Ssh +module Proxy::RemoteExecution::Ssh::Actions class PullScript < Proxy::Dynflow::Action::Runner JobDelivered = Class.new @@ -22,14 +22,14 @@ def run(event = nil) end def init_run - Proxy::RemoteExecution::Ssh::Plugin.job_storage["#{input[:hostname]}-#{execution_plan_id}", run_step_id, 'script.sh'] = input[:script] + job_storage["#{input[:hostname]}-#{execution_plan_id}", run_step_id, 'script.sh'] = input[:script] output[:state] = :ready_for_pickup mqtt_start if input[:with_mqtt] suspend end def cleanup(_plan = nil) - Proxy::RemoteExecution::Ssh::Plugin.job_storage.delete("#{input[:hostname]}-#{execution_plan_id}") + job_storage.delete("#{input[:hostname]}-#{execution_plan_id}") end def process_external_event(event) @@ -68,7 +68,7 @@ def mqtt_start sent: DateTime.now.iso8601, directive: 'foreman', metadata: { - 'return_url': "#{input[:proxy_url]}/job/#{execution_plan_id}/#{run_step_id}/update" + 'return_url': "#{input[:proxy_url]}/job/#{execution_plan_id}/#{run_step_id}/update", }, content: "#{input[:proxy_url]}/job/store/#{execution_plan_id}/#{run_step_id}/script.sh", } @@ -77,11 +77,17 @@ def mqtt_start end def mqtt_notify(payload) - broker = 'localhost' # TODO - broker_port = 1883 # TODO - MQTT::Client.connect(broker, broker_port) do |c| + MQTT::Client.connect(settings.mqtt_broker, settings.mqtt_port) do |c| c.publish("yggdrasil/#{input[:hostname]}/data/in", JSON.dump(payload), false, 1) end end + + def settings + Proxy::Plugin::RemoteExecution::Ssh::Plugin.settings + end + + def job_storage + Proxy::RemoteExecution::Ssh.job_storage + end end end diff --git a/lib/smart_proxy_remote_execution_ssh/actions/run_script.rb b/lib/smart_proxy_remote_execution_ssh/actions/run_script.rb index 6de585a..45448c2 100644 --- a/lib/smart_proxy_remote_execution_ssh/actions/run_script.rb +++ b/lib/smart_proxy_remote_execution_ssh/actions/run_script.rb @@ -3,7 +3,20 @@ module Proxy::RemoteExecution::Ssh module Actions - class RunScript < Proxy::Dynflow::Action::Runner + class RunScript < ::Dynflow::Action + def plan(*args) + mode = Proxy::RemoteExecution::Ssh::Plugin.settings.mode + case mode + when :ssh, :'ssh-async' + plan_action(ScriptRunner, *args) + when :pull, :'pull-mqtt' + plan_action(PullScript, *args, + mqtt: mode == :'pull-mqtt') + end + end + end + + class ScriptRunner < Proxy::Dynflow::Action::Runner def initiate_runner additional_options = { :step_id => run_step_id, diff --git a/lib/smart_proxy_remote_execution_ssh/plugin.rb b/lib/smart_proxy_remote_execution_ssh/plugin.rb index 2bf643b..eeccf89 100644 --- a/lib/smart_proxy_remote_execution_ssh/plugin.rb +++ b/lib/smart_proxy_remote_execution_ssh/plugin.rb @@ -1,6 +1,7 @@ module Proxy::RemoteExecution::Ssh class Plugin < Proxy::Plugin SSH_LOG_LEVELS = %w[debug info warn error fatal].freeze + MODES = %i[ssh async-ssh pull pull-mqtt] http_rackup_path File.expand_path("http_config.ru", File.expand_path("../", __FILE__)) https_rackup_path File.expand_path("http_config.ru", File.expand_path("../", __FILE__)) @@ -11,11 +12,13 @@ class Plugin < Proxy::Plugin :remote_working_dir => '/var/tmp', :local_working_dir => '/var/tmp', :kerberos_auth => false, - :async_ssh => false, # When set to nil, makes REX use the runner's default interval # :runner_refresh_interval => nil, :ssh_log_level => :fatal, - :cleanup_working_dirs => true + :cleanup_working_dirs => true, + # :mqtt_broker => nil, + # :mqtt_port => nil, + :mode => :ssh plugin :ssh, Proxy::RemoteExecution::Ssh::VERSION after_activation do @@ -23,8 +26,7 @@ class Plugin < Proxy::Plugin require 'smart_proxy_remote_execution_ssh/version' require 'smart_proxy_remote_execution_ssh/cockpit' require 'smart_proxy_remote_execution_ssh/api' - require 'smart_proxy_remote_execution_ssh/actions/run_script' - require 'smart_proxy_remote_execution_ssh/actions/pull_script' + require 'smart_proxy_remote_execution_ssh/actions' require 'smart_proxy_remote_execution_ssh/dispatcher' require 'smart_proxy_remote_execution_ssh/log_filter' require 'smart_proxy_remote_execution_ssh/runners' @@ -32,6 +34,11 @@ class Plugin < Proxy::Plugin Proxy::RemoteExecution::Ssh.validate! + # TODO: Move into more native methods when available + if settings.async_ssh + logger.warn('Option async_ssh is deprecated, use ssh-async mode instead.') + end + Proxy::Dynflow::TaskLauncherRegistry.register('ssh', Proxy::Dynflow::TaskLauncher::Batch) end @@ -42,7 +49,7 @@ def self.simulate? def self.runner_class @runner_class ||= if simulate? Runners::FakeScriptRunner - elsif settings[:async_ssh] + elsif settings.mode == :'ssh-async' Runners::PollingScriptRunner else Runners::ScriptRunner diff --git a/settings.d/remote_execution_ssh.yml.example b/settings.d/remote_execution_ssh.yml.example index 7d0412d..d4214ad 100644 --- a/settings.d/remote_execution_ssh.yml.example +++ b/settings.d/remote_execution_ssh.yml.example @@ -4,7 +4,9 @@ :local_working_dir: '/var/tmp' :remote_working_dir: '/var/tmp' # :kerberos_auth: false -# :async_ssh: false + +# Mode of operation, one of ssh, ssh-async, pull, pull-mqtt +:mode: ssh # Defines how often (in seconds) should the runner check # for new data leave empty to use the runner's default @@ -18,3 +20,7 @@ # Remove working directories on job completion # :cleanup_working_dirs: true + +# MQTT configuration, need to be set if mode is set to pull-mqtt +# :mqtt_broker: localhost +# :mqtt_port: 1883 From ca28a304132e24e9468efb28039d7a878a980366 Mon Sep 17 00:00:00 2001 From: Adam Ruzicka Date: Tue, 17 Aug 2021 12:46:32 +0200 Subject: [PATCH 03/12] Fixes #33682 - Rework storage - now uses in-memory sqlite - each job is identified by its uuid - get /jobs returns a list of job uuids available for a given host - get /jobs/:job_uuid returns the job contents - post /jobs/:job_uuid/update lets client upload updates for a job --- lib/smart_proxy_remote_execution_ssh.rb | 19 ++- .../actions/pull_script.rb | 16 ++- lib/smart_proxy_remote_execution_ssh/api.rb | 42 +++--- test/api_test.rb | 127 +++++++++++------- 4 files changed, 132 insertions(+), 72 deletions(-) diff --git a/lib/smart_proxy_remote_execution_ssh.rb b/lib/smart_proxy_remote_execution_ssh.rb index 06a3fc4..9de563c 100644 --- a/lib/smart_proxy_remote_execution_ssh.rb +++ b/lib/smart_proxy_remote_execution_ssh.rb @@ -2,6 +2,7 @@ require 'smart_proxy_remote_execution_ssh/version' require 'smart_proxy_remote_execution_ssh/plugin' require 'smart_proxy_remote_execution_ssh/webrick_ext' +require 'sequel' module Proxy::RemoteExecution module Ssh @@ -70,7 +71,23 @@ def validate_ssh_log_level! end def job_storage - @job_storage ||= Proxy::MemoryStore.new + @job_storage ||= initialize_job_storage + @job_storage[:jobs] + end + + private + + def initialize_job_storage + db = Sequel.sqlite + db.create_table :jobs do + DateTime :timestamp, null: false + String :uuid, fixed: true, size: 36, primary_key: true, null: false + String :hostname, null: false, index: true + String :execution_plan_uuid, fixed: true, size: 36, null: false, index: true + Integer :run_step_id, null: false + String :job, text: true + end + db end end end diff --git a/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb b/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb index 8a6f78f..4e1582b 100644 --- a/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb +++ b/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb @@ -10,6 +10,7 @@ class PullScript < Proxy::Dynflow::Action::Runner def plan(action_input, mqtt: false) super(action_input) input[:with_mqtt] = mqtt + input[:job_uuid] = SecureRandom.uuid end def run(event = nil) @@ -22,14 +23,20 @@ def run(event = nil) end def init_run - job_storage["#{input[:hostname]}-#{execution_plan_id}", run_step_id, 'script.sh'] = input[:script] + job_storage.insert(timestamp: Time.now.utc, + uuid: input[:job_uuid], + hostname: input[:hostname], + execution_plan_uuid: execution_plan_id, + run_step_id: run_step_id, + job: input[:script]) + # job_storage["#{input[:hostname]}-#{execution_plan_id}", run_step_id, 'script.sh'] = input[:script] output[:state] = :ready_for_pickup mqtt_start if input[:with_mqtt] suspend end def cleanup(_plan = nil) - job_storage.delete("#{input[:hostname]}-#{execution_plan_id}") + job_storage.where(execution_plan_uuid: execution_plan_id, run_step_id: run_step_id).delete end def process_external_event(event) @@ -68,9 +75,10 @@ def mqtt_start sent: DateTime.now.iso8601, directive: 'foreman', metadata: { - 'return_url': "#{input[:proxy_url]}/job/#{execution_plan_id}/#{run_step_id}/update", + 'job_uuid': input[:job_uuid], + 'return_url': "#{input[:proxy_url]}/jobs/#{input[:job_uuid]}/update", }, - content: "#{input[:proxy_url]}/job/store/#{execution_plan_id}/#{run_step_id}/script.sh", + content: "#{input[:proxy_url]}/jobs/#{input[:job_uuid]}", } mqtt_notify payload output[:state] = :notified diff --git a/lib/smart_proxy_remote_execution_ssh/api.rb b/lib/smart_proxy_remote_execution_ssh/api.rb index 01787fa..375709e 100644 --- a/lib/smart_proxy_remote_execution_ssh/api.rb +++ b/lib/smart_proxy_remote_execution_ssh/api.rb @@ -42,38 +42,42 @@ class Api < ::Sinatra::Base # Payload is a hash where # exit_code: Integer | NilClass # output: String - post '/job/:task_id/:step_id/update' do |task_id, step_id| + post '/jobs/:job_uuid/update' do |job_uuid| do_authorize_with_ssl_client - path = job_path(https_cert_cn, task_id, nil, nil).first - if Proxy::RemoteExecution::Ssh.job_storage[path].nil? + job_record = Proxy::RemoteExecution::Ssh.job_storage.where(uuid: job_uuid, hostname: https_cert_cn).first + if job_record.nil? status 404 return '' end data = MultiJson.load(request.body.read) - world.event(task_id, step_id, ::Proxy::Dynflow::Runner::ExternalEvent.new(data)) + world.event job_record[:execution_plan_uuid], + job_record[:run_step_id], + ::Proxy::Dynflow::Runner::ExternalEvent.new(data) end - get "/job/store/:task_id/:step_id/:file" do |task_id, step_id, file| + get '/jobs' do do_authorize_with_ssl_client - path = job_path(https_cert_cn, task_id, step_id.to_i, file) - content = Proxy::RemoteExecution::Ssh.job_storage[*path] - if content - world.event(task_id, step_id.to_i, Proxy::RemoteExecution::Ssh::PullScript::JobDelivered) - return content - end - - status 404 - '' + uuids = Proxy::RemoteExecution::Ssh.job_storage.order(:timestamp) + .where(hostname: https_cert_cn) + .select_map(:uuid) + MultiJson.dump(uuids) end - def job_path(hostname, task_id, step_id, file) - ["#{hostname}-#{task_id}", - step_id, - file, - ] + get "/jobs/:job_uuid" do |job_uuid| + do_authorize_with_ssl_client + + job_record = Proxy::RemoteExecution::Ssh.job_storage.where(uuid: job_uuid, hostname: https_cert_cn).first + + if job_record.nil? + status 404 + return '' + end + + world.event(job_record[:execution_plan_uuid], job_record[:run_step_id], Actions::PullScript::JobDelivered) + job_record[:job] end end end diff --git a/test/api_test.rb b/test/api_test.rb index e4a4aa7..9733e17 100644 --- a/test/api_test.rb +++ b/test/api_test.rb @@ -80,69 +80,100 @@ def with_known_hosts end end - describe '/job/update' do - let(:hostname) { 'myhost.example.com' } - - before { Proxy::RemoteExecution::Ssh.job_storage["#{hostname}-12345", 1, 'message'] = 'hello' } - after { Proxy::RemoteExecution::Ssh.job_storage.delete("#{hostname}-12345") } - - it 'returns 403 if HTTPS is used and no cert is provided' do - post '/job/12345/1/update', {}, 'HTTPS' => 1 - _(last_response.status).must_equal 403 + describe 'job storage' do + let(:uuid) { SecureRandom.uuid } + let(:execution_plan_uuid) { SecureRandom.uuid } + let(:run_step_id) { 1 } + let(:hostname) { 'something.somewhere.com' } + let(:content) { 'content' } + + before do + Proxy::RemoteExecution::Ssh + .job_storage + .insert(timestamp: Time.now.utc, + uuid: uuid, + hostname: hostname, + execution_plan_uuid: execution_plan_uuid, + run_step_id: run_step_id, + job: content) end - it 'returns 404 if job does not exist' do - Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:https_cert_cn).returns(hostname) - post '/job/12346/1/update' - _(last_response.status).must_equal 404 + after do + Proxy::RemoteExecution::Ssh.job_storage.delete end - it 'dispatches an event' do - Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:https_cert_cn).returns(hostname) - fake_world = mock - fake_world.expects(:event) do |task_id, step_id, _payload| - task_id == '12345' && step_id == 1 + describe '/jobs/update' do + it 'returns 403 if HTTPS is used and no cert is provided' do + post '/jobs/12345/update', {}, 'HTTPS' => 1 + _(last_response.status).must_equal 403 end - Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:world).returns(fake_world) - - post '/job/12345/1/update', '{}' - _(last_response.status).must_equal 200 - end - end - describe '/job/store' do - let(:hostname) { 'myhost.example.com' } + it 'returns 404 if job does not exist' do + Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:https_cert_cn).returns(hostname) + post '/jobs/12345/update' + _(last_response.status).must_equal 404 + end - before { Proxy::RemoteExecution::Ssh.job_storage["#{hostname}-12345", 1, 'message'] = 'hello' } - after { Proxy::RemoteExecution::Ssh.job_storage.delete("#{hostname}-12345") } + it 'dispatches an event' do + Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:https_cert_cn).returns(hostname) + fake_world = mock + fake_world.expects(:event) do |task_id, step_id, _payload| + task_id == execution_plan_uuid && step_id == run_step_id + end + Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:world).returns(fake_world) - it 'returns 403 if HTTPS is used and no cert is provided' do - get '/job/store/12345/1/message', {}, 'HTTPS' => 1 - _(last_response.status).must_equal 403 + post "/jobs/#{uuid}/update", '{}' + _(last_response.status).must_equal 200 + end end - it 'returns content if there is some and notifies the action' do - Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:https_cert_cn).returns(hostname) - fake_world = mock - fake_world.expects(:event).with('12345', 1, Proxy::RemoteExecution::Ssh::PullScript::JobDelivered) - Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:world).returns(fake_world) + describe '/jobs/:job_uuid' do + it 'returns 403 if HTTPS is used and no cert is provided' do + get '/jobs/12345', {}, 'HTTPS' => 1 + _(last_response.status).must_equal 403 + end - get '/job/store/12345/1/message' - _(last_response.status).must_equal 200 - _(last_response.body).must_equal 'hello' - end + it 'returns content if there is some and notifies the action' do + Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:https_cert_cn).returns(hostname) + fake_world = mock + fake_world.expects(:event).with(execution_plan_uuid, run_step_id, Actions::PullScript::JobDelivered) + Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:world).returns(fake_world) - it 'returns 404 if there is no content' do - Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:https_cert_cn).times(3).returns(hostname) + get "/jobs/#{uuid}" + _(last_response.status).must_equal 200 + _(last_response.body).must_equal content + end - get '/job/store/12345/1/something.tar.gz' - _(last_response.status).must_equal 404 + it 'returns 404 if there is no content' do + Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:https_cert_cn).returns(hostname) - get '/job/store/12345/2/something.tar.gz' - _(last_response.status).must_equal 404 + get '/jobs/12345' + _(last_response.status).must_equal 404 + end + end - get '/job/store/12346/2/something.tar.gz' - _(last_response.status).must_equal 404 + describe '/jobs' do + it 'returns 403 if HTTPS is used and no cert is provided' do + get '/jobs', {}, 'HTTPS' => 1 + _(last_response.status).must_equal 403 + end + + it 'returns a list of job uuids for a given host' do + Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:https_cert_cn).returns(hostname) + Proxy::RemoteExecution::Ssh + .job_storage + .insert(timestamp: Time.now.utc, + uuid: SecureRandom.uuid, + hostname: 'another.host', + execution_plan_uuid: SecureRandom.uuid, + run_step_id: 1, + job: 'hello') + + get '/jobs' + _(last_response.status).must_equal 200 + data = MultiJson.load(last_response.body) + _(data).must_equal [uuid] + end end end end From 470ecca1c312e3af9a50941e0df77bb5f1bbb787 Mon Sep 17 00:00:00 2001 From: Adam Ruzicka Date: Tue, 17 Aug 2021 14:01:40 +0200 Subject: [PATCH 04/12] Fixes #33682 - Extract job store operations into a separate class --- lib/smart_proxy_remote_execution_ssh.rb | 18 +------ .../actions/pull_script.rb | 11 +--- lib/smart_proxy_remote_execution_ssh/api.rb | 10 ++-- .../job_storage.rb | 50 +++++++++++++++++++ .../plugin.rb | 1 + test/api_test.rb | 25 ++++------ 6 files changed, 66 insertions(+), 49 deletions(-) create mode 100644 lib/smart_proxy_remote_execution_ssh/job_storage.rb diff --git a/lib/smart_proxy_remote_execution_ssh.rb b/lib/smart_proxy_remote_execution_ssh.rb index 9de563c..3547e07 100644 --- a/lib/smart_proxy_remote_execution_ssh.rb +++ b/lib/smart_proxy_remote_execution_ssh.rb @@ -71,23 +71,7 @@ def validate_ssh_log_level! end def job_storage - @job_storage ||= initialize_job_storage - @job_storage[:jobs] - end - - private - - def initialize_job_storage - db = Sequel.sqlite - db.create_table :jobs do - DateTime :timestamp, null: false - String :uuid, fixed: true, size: 36, primary_key: true, null: false - String :hostname, null: false, index: true - String :execution_plan_uuid, fixed: true, size: 36, null: false, index: true - Integer :run_step_id, null: false - String :job, text: true - end - db + @job_storage ||= Proxy::RemoteExecution::Ssh::JobStorage.new end end end diff --git a/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb b/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb index 4e1582b..f072eb6 100644 --- a/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb +++ b/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb @@ -10,7 +10,6 @@ class PullScript < Proxy::Dynflow::Action::Runner def plan(action_input, mqtt: false) super(action_input) input[:with_mqtt] = mqtt - input[:job_uuid] = SecureRandom.uuid end def run(event = nil) @@ -23,20 +22,14 @@ def run(event = nil) end def init_run - job_storage.insert(timestamp: Time.now.utc, - uuid: input[:job_uuid], - hostname: input[:hostname], - execution_plan_uuid: execution_plan_id, - run_step_id: run_step_id, - job: input[:script]) - # job_storage["#{input[:hostname]}-#{execution_plan_id}", run_step_id, 'script.sh'] = input[:script] + input[:job_uuid] = job_storage.store_job(input[:hostname], execution_plan_id, run_step_id, input[:script]) output[:state] = :ready_for_pickup mqtt_start if input[:with_mqtt] suspend end def cleanup(_plan = nil) - job_storage.where(execution_plan_uuid: execution_plan_id, run_step_id: run_step_id).delete + job_storage.drop_job(execution_plan_id, run_step_id) end def process_external_event(event) diff --git a/lib/smart_proxy_remote_execution_ssh/api.rb b/lib/smart_proxy_remote_execution_ssh/api.rb index 375709e..0c8dead 100644 --- a/lib/smart_proxy_remote_execution_ssh/api.rb +++ b/lib/smart_proxy_remote_execution_ssh/api.rb @@ -45,7 +45,7 @@ class Api < ::Sinatra::Base post '/jobs/:job_uuid/update' do |job_uuid| do_authorize_with_ssl_client - job_record = Proxy::RemoteExecution::Ssh.job_storage.where(uuid: job_uuid, hostname: https_cert_cn).first + job_record = Proxy::RemoteExecution::Ssh.job_storage.find_job(https_cert_cn, job_uuid) if job_record.nil? status 404 return '' @@ -60,17 +60,13 @@ class Api < ::Sinatra::Base get '/jobs' do do_authorize_with_ssl_client - uuids = Proxy::RemoteExecution::Ssh.job_storage.order(:timestamp) - .where(hostname: https_cert_cn) - .select_map(:uuid) - MultiJson.dump(uuids) + MultiJson.dump(Proxy::RemoteExecution::Ssh.job_storage.job_uuids_for_host(https_cert_cn)) end get "/jobs/:job_uuid" do |job_uuid| do_authorize_with_ssl_client - job_record = Proxy::RemoteExecution::Ssh.job_storage.where(uuid: job_uuid, hostname: https_cert_cn).first - + job_record = Proxy::RemoteExecution::Ssh.job_storage.find_job(https_cert_cn, job_uuid) if job_record.nil? status 404 return '' diff --git a/lib/smart_proxy_remote_execution_ssh/job_storage.rb b/lib/smart_proxy_remote_execution_ssh/job_storage.rb new file mode 100644 index 0000000..214a18a --- /dev/null +++ b/lib/smart_proxy_remote_execution_ssh/job_storage.rb @@ -0,0 +1,50 @@ +# lib/job_storage.rb + +module Proxy::RemoteExecution::Ssh + class JobStorage + def initialize + @db = Sequel.sqlite + @db.create_table :jobs do + DateTime :timestamp, null: false, default: Sequel::CURRENT_TIMESTAMP + String :uuid, fixed: true, size: 36, primary_key: true, null: false + String :hostname, null: false, index: true + String :execution_plan_uuid, fixed: true, size: 36, null: false, index: true + Integer :run_step_id, null: false + String :job, text: true + end + end + + def find_job(hostname, uuid) + jobs_for_host(hostname).where(uuid: uuid).first + end + + def job_uuids_for_host(hostname) + jobs_for_host(hostname).order(:timestamp) + .select_map(:uuid) + end + + def store_job(hostname, execution_plan_uuid, run_step_id, job, uuid: SecureRandom.uuid, timestamp: Time.now.utc) + jobs.insert(timestamp: timestamp, + uuid: uuid, + hostname: hostname, + execution_plan_uuid: execution_plan_uuid, + run_step_id: run_step_id, + job: job) + uuid + end + + def drop_job(execution_plan_uuid, run_step_id) + jobs.where(execution_plan_uuid: execution_plan_id, run_step_id: run_step_id).delete + end + + private + + def jobs_for_host(hostname) + jobs.where(hostname: hostname) + end + + def jobs + @db[:jobs] + end + end +end diff --git a/lib/smart_proxy_remote_execution_ssh/plugin.rb b/lib/smart_proxy_remote_execution_ssh/plugin.rb index eeccf89..cbf24f7 100644 --- a/lib/smart_proxy_remote_execution_ssh/plugin.rb +++ b/lib/smart_proxy_remote_execution_ssh/plugin.rb @@ -31,6 +31,7 @@ class Plugin < Proxy::Plugin require 'smart_proxy_remote_execution_ssh/log_filter' require 'smart_proxy_remote_execution_ssh/runners' require 'smart_proxy_remote_execution_ssh/utils' + require 'smart_proxy_remote_execution_ssh/job_storage' Proxy::RemoteExecution::Ssh.validate! diff --git a/test/api_test.rb b/test/api_test.rb index 9733e17..5f0b95e 100644 --- a/test/api_test.rb +++ b/test/api_test.rb @@ -1,6 +1,7 @@ require 'test_helper' require 'tempfile' require 'smart_proxy_remote_execution_ssh/actions/pull_script' +require 'smart_proxy_remote_execution_ssh/job_storage' KNOWN_HOSTS = < Date: Wed, 18 Aug 2021 13:33:57 +0200 Subject: [PATCH 05/12] Fixes #33682 - Add support for password based auth --- .../actions/pull_script.rb | 10 +++- lib/smart_proxy_remote_execution_ssh/api.rb | 39 ++++++++++------ .../job_storage.rb | 4 +- test/api_test.rb | 46 ++++++++++++++++++- 4 files changed, 79 insertions(+), 20 deletions(-) diff --git a/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb b/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb index f072eb6..7ae9fab 100644 --- a/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb +++ b/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb @@ -22,14 +22,18 @@ def run(event = nil) end def init_run + otp_password = if input[:with_mqtt] + ::Proxy::Dynflow::Manager.generate_otp(execution_plan_id) + end input[:job_uuid] = job_storage.store_job(input[:hostname], execution_plan_id, run_step_id, input[:script]) output[:state] = :ready_for_pickup - mqtt_start if input[:with_mqtt] + mqtt_start(otp_password) if input[:with_mqtt] suspend end def cleanup(_plan = nil) job_storage.drop_job(execution_plan_id, run_step_id) + Proxy::Dynflow::OtpManager.passwords.delete(execution_plan_id) end def process_external_event(event) @@ -60,7 +64,7 @@ def kill_run suspend end - def mqtt_start + def mqtt_start(otp_password) payload = { type: 'data', message_id: SecureRandom.uuid, @@ -69,6 +73,8 @@ def mqtt_start directive: 'foreman', metadata: { 'job_uuid': input[:job_uuid], + 'username': execution_plan_id, + 'password': otp_password, 'return_url': "#{input[:proxy_url]}/jobs/#{input[:job_uuid]}/update", }, content: "#{input[:proxy_url]}/jobs/#{input[:job_uuid]}", diff --git a/lib/smart_proxy_remote_execution_ssh/api.rb b/lib/smart_proxy_remote_execution_ssh/api.rb index 0c8dead..266d12c 100644 --- a/lib/smart_proxy_remote_execution_ssh/api.rb +++ b/lib/smart_proxy_remote_execution_ssh/api.rb @@ -7,6 +7,7 @@ module Ssh class Api < ::Sinatra::Base include Sinatra::Authorization::Helpers + include Proxy::Dynflow::Helpers get "/pubkey" do File.read(Ssh.public_key_file) @@ -45,16 +46,12 @@ class Api < ::Sinatra::Base post '/jobs/:job_uuid/update' do |job_uuid| do_authorize_with_ssl_client - job_record = Proxy::RemoteExecution::Ssh.job_storage.find_job(https_cert_cn, job_uuid) - if job_record.nil? - status 404 - return '' + with_authorized_job(job_uuid) do |job_record| + data = MultiJson.load(request.body.read) + world.event job_record[:execution_plan_uuid], + job_record[:run_step_id], + ::Proxy::Dynflow::Runner::ExternalEvent.new(data) end - - data = MultiJson.load(request.body.read) - world.event job_record[:execution_plan_uuid], - job_record[:run_step_id], - ::Proxy::Dynflow::Runner::ExternalEvent.new(data) end get '/jobs' do @@ -66,14 +63,26 @@ class Api < ::Sinatra::Base get "/jobs/:job_uuid" do |job_uuid| do_authorize_with_ssl_client - job_record = Proxy::RemoteExecution::Ssh.job_storage.find_job(https_cert_cn, job_uuid) - if job_record.nil? - status 404 - return '' + with_authorized_job(job_uuid) do |job_record| + world.event(job_record[:execution_plan_uuid], job_record[:run_step_id], Actions::PullScript::JobDelivered) + job_record[:job] + end + end + + private + + def with_authorized_job(uuid) + if (job = authorized_job(uuid)) + yield job + else + halt 404 end + end - world.event(job_record[:execution_plan_uuid], job_record[:run_step_id], Actions::PullScript::JobDelivered) - job_record[:job] + def authorized_job(uuid) + job_record = Proxy::RemoteExecution::Ssh.job_storage.find_job(uuid) || {} + return job_record if authorize_with_token(clear: false, task_id: job_record[:execution_plan_uuid]) || + job_record[:hostname] == https_cert_cn end end end diff --git a/lib/smart_proxy_remote_execution_ssh/job_storage.rb b/lib/smart_proxy_remote_execution_ssh/job_storage.rb index 214a18a..2f1c582 100644 --- a/lib/smart_proxy_remote_execution_ssh/job_storage.rb +++ b/lib/smart_proxy_remote_execution_ssh/job_storage.rb @@ -14,8 +14,8 @@ def initialize end end - def find_job(hostname, uuid) - jobs_for_host(hostname).where(uuid: uuid).first + def find_job(uuid) + jobs.where(uuid: uuid).first end def job_uuids_for_host(hostname) diff --git a/test/api_test.rb b/test/api_test.rb index 5f0b95e..b39c8bf 100644 --- a/test/api_test.rb +++ b/test/api_test.rb @@ -2,6 +2,7 @@ require 'tempfile' require 'smart_proxy_remote_execution_ssh/actions/pull_script' require 'smart_proxy_remote_execution_ssh/job_storage' +require 'smart_proxy_dynflow/otp_manager' KNOWN_HOSTS = < "Basic #{auth}" + _(last_response.status).must_equal 403 + end + it 'returns 404 if job does not exist' do Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:https_cert_cn).returns(hostname) - post '/jobs/12345/update' + post '/jobs/12345/update', {} _(last_response.status).must_equal 404 end + it 'supports http basic auth' do + pass = Proxy::Dynflow::OtpManager.generate_otp(execution_plan_uuid) + auth = Proxy::Dynflow::OtpManager.tokenize(execution_plan_uuid, pass) + + fake_world = mock + fake_world.expects(:event) do |task_id, step_id, _payload| + task_id == execution_plan_uuid && step_id == run_step_id + end + Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:world).returns(fake_world) + + post "/jobs/#{uuid}/update", '{}', 'HTTP_AUTHORIZATION' => "Basic #{auth}" + _(last_response.status).must_equal 200 + + Proxy::Dynflow::OtpManager.passwords.delete(execution_plan_uuid) + end + it 'dispatches an event' do Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:https_cert_cn).returns(hostname) fake_world = mock @@ -131,6 +154,27 @@ def with_known_hosts _(last_response.status).must_equal 403 end + it 'returns 403 when wrong credentials are supplied' do + auth = Proxy::Dynflow::OtpManager.tokenize('username', 'password') + get '/jobs/12345', {}, 'HTTP_AUTHORIZATION' => "Basic #{auth}" + _(last_response.status).must_equal 403 + end + + it 'returns content if there is some and notifies the action when using password' do + pass = Proxy::Dynflow::OtpManager.generate_otp(execution_plan_uuid) + auth = Proxy::Dynflow::OtpManager.tokenize(execution_plan_uuid, pass) + + fake_world = mock + fake_world.expects(:event).with(execution_plan_uuid, run_step_id, Actions::PullScript::JobDelivered) + Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:world).returns(fake_world) + + get "/jobs/#{uuid}", {}, 'HTTP_AUTHORIZATION' => "Basic #{auth}" + _(last_response.status).must_equal 200 + _(last_response.body).must_equal content + + Proxy::Dynflow::OtpManager.passwords.delete(execution_plan_uuid) + end + it 'returns content if there is some and notifies the action' do Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:https_cert_cn).returns(hostname) fake_world = mock From 870f2fcbd49b982b339106aa72ec8212fadbbab4 Mon Sep 17 00:00:00 2001 From: Adam Ruzicka Date: Wed, 18 Aug 2021 13:38:51 +0200 Subject: [PATCH 06/12] Fixes #33682 - Cleaner event dispatch --- lib/smart_proxy_remote_execution_ssh/api.rb | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/smart_proxy_remote_execution_ssh/api.rb b/lib/smart_proxy_remote_execution_ssh/api.rb index 266d12c..7af69b4 100644 --- a/lib/smart_proxy_remote_execution_ssh/api.rb +++ b/lib/smart_proxy_remote_execution_ssh/api.rb @@ -48,9 +48,7 @@ class Api < ::Sinatra::Base with_authorized_job(job_uuid) do |job_record| data = MultiJson.load(request.body.read) - world.event job_record[:execution_plan_uuid], - job_record[:run_step_id], - ::Proxy::Dynflow::Runner::ExternalEvent.new(data) + notify_job(job_record, ::Proxy::Dynflow::Runner::ExternalEvent.new(data)) end end @@ -64,13 +62,17 @@ class Api < ::Sinatra::Base do_authorize_with_ssl_client with_authorized_job(job_uuid) do |job_record| - world.event(job_record[:execution_plan_uuid], job_record[:run_step_id], Actions::PullScript::JobDelivered) + notify_job(job_record, Actions::PullScript::JobDelivered) job_record[:job] end end private + def notify_job(job_record, event) + world.event(job_record[:execution_plan_uuid], job_record[:run_step_id], event) + end + def with_authorized_job(uuid) if (job = authorized_job(uuid)) yield job From dc26154f81588e66d8246f89b4c11541658983b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20R=C5=AF=C5=BEi=C4=8Dka?= Date: Tue, 24 Aug 2021 15:07:29 +0200 Subject: [PATCH 07/12] Fixes #33682 - Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Ondřej Ezr --- lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb b/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb index 7ae9fab..bc65c7f 100644 --- a/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb +++ b/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb @@ -23,7 +23,7 @@ def run(event = nil) def init_run otp_password = if input[:with_mqtt] - ::Proxy::Dynflow::Manager.generate_otp(execution_plan_id) + ::Proxy::Dynflow::OtpManager.generate_otp(execution_plan_id) end input[:job_uuid] = job_storage.store_job(input[:hostname], execution_plan_id, run_step_id, input[:script]) output[:state] = :ready_for_pickup @@ -90,7 +90,7 @@ def mqtt_notify(payload) end def settings - Proxy::Plugin::RemoteExecution::Ssh::Plugin.settings + Proxy::RemoteExecution::Ssh::Plugin.settings end def job_storage From c1eee2e0be600e54cdddb4aa724cedb598d10ea8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20R=C5=AF=C5=BEi=C4=8Dka?= Date: Fri, 3 Sep 2021 14:04:24 +0200 Subject: [PATCH 08/12] Fixes #33682 - Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Ondřej Ezr --- .../actions/pull_script.rb | 7 ++++--- lib/smart_proxy_remote_execution_ssh/job_storage.rb | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb b/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb index bc65c7f..1f1c12e 100644 --- a/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb +++ b/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb @@ -27,6 +27,7 @@ def init_run end input[:job_uuid] = job_storage.store_job(input[:hostname], execution_plan_id, run_step_id, input[:script]) output[:state] = :ready_for_pickup + output[:result] = [] mqtt_start(otp_password) if input[:with_mqtt] suspend end @@ -40,7 +41,7 @@ def process_external_event(event) output[:state] = :running data = event.data continuous_output = Proxy::Dynflow::ContinuousOutput.new - continuous_output.add_output(lines, 'stdout') if data.key?('output') + Array(data['output']).each { |line| continuous_output.add_output(line, 'stdout') } if data.key?('output') exit_code = data['exit_code'].to_i if data['exit_code'] process_update(Proxy::Dynflow::Runner::Update.new(continuous_output, exit_code)) end @@ -75,9 +76,9 @@ def mqtt_start(otp_password) 'job_uuid': input[:job_uuid], 'username': execution_plan_id, 'password': otp_password, - 'return_url': "#{input[:proxy_url]}/jobs/#{input[:job_uuid]}/update", + 'return_url': "#{input[:proxy_url]}/ssh/jobs/#{input[:job_uuid]}/update", }, - content: "#{input[:proxy_url]}/jobs/#{input[:job_uuid]}", + content: "#{input[:proxy_url]}/ssh/jobs/#{input[:job_uuid]}", } mqtt_notify payload output[:state] = :notified diff --git a/lib/smart_proxy_remote_execution_ssh/job_storage.rb b/lib/smart_proxy_remote_execution_ssh/job_storage.rb index 2f1c582..7503e07 100644 --- a/lib/smart_proxy_remote_execution_ssh/job_storage.rb +++ b/lib/smart_proxy_remote_execution_ssh/job_storage.rb @@ -34,7 +34,7 @@ def store_job(hostname, execution_plan_uuid, run_step_id, job, uuid: SecureRando end def drop_job(execution_plan_uuid, run_step_id) - jobs.where(execution_plan_uuid: execution_plan_id, run_step_id: run_step_id).delete + jobs.where(execution_plan_uuid: execution_plan_uuid, run_step_id: run_step_id).delete end private From 6462ce51ed86f8fc1cd04c3bf0a2280cd8229377 Mon Sep 17 00:00:00 2001 From: Adam Ruzicka Date: Fri, 3 Sep 2021 14:23:14 +0200 Subject: [PATCH 09/12] Fixes #33682 - Prefer consumer uuid over hostname, if available --- .../actions/pull_script.rb | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb b/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb index 1f1c12e..e2a3631 100644 --- a/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb +++ b/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb @@ -25,7 +25,7 @@ def init_run otp_password = if input[:with_mqtt] ::Proxy::Dynflow::OtpManager.generate_otp(execution_plan_id) end - input[:job_uuid] = job_storage.store_job(input[:hostname], execution_plan_id, run_step_id, input[:script]) + input[:job_uuid] = job_storage.store_job(host_name, execution_plan_id, run_step_id, input[:script]) output[:state] = :ready_for_pickup output[:result] = [] mqtt_start(otp_password) if input[:with_mqtt] @@ -86,10 +86,22 @@ def mqtt_start(otp_password) def mqtt_notify(payload) MQTT::Client.connect(settings.mqtt_broker, settings.mqtt_port) do |c| - c.publish("yggdrasil/#{input[:hostname]}/data/in", JSON.dump(payload), false, 1) + c.publish(mqtt_topic, JSON.dump(payload), false, 1) end end + def host_name + alternative_names = input.fetch(:alternative_names, {}) + + alternative_names[:consumer_uuid] || + alternative_names[:fqdn] || + input[:hostname] + end + + def mqtt_topic + "yggdrasil/#{host_name}/data/in" + end + def settings Proxy::RemoteExecution::Ssh::Plugin.settings end From 0f8a2be789014e203b98cb20f73855c1ee8efe5a Mon Sep 17 00:00:00 2001 From: Adam Ruzicka Date: Tue, 12 Oct 2021 10:20:30 +0200 Subject: [PATCH 10/12] Fixes #33682 - Slightly better handling of async_ssh setting --- lib/smart_proxy_remote_execution_ssh.rb | 14 ++++++++++++++ lib/smart_proxy_remote_execution_ssh/plugin.rb | 5 ----- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/lib/smart_proxy_remote_execution_ssh.rb b/lib/smart_proxy_remote_execution_ssh.rb index 3547e07..4f1fa85 100644 --- a/lib/smart_proxy_remote_execution_ssh.rb +++ b/lib/smart_proxy_remote_execution_ssh.rb @@ -40,6 +40,20 @@ def validate_mode! unless Plugin::MODES.include? Plugin.settings.mode raise "Mode has to be one of #{Plugin::MODES.join(', ')}, given #{Plugin.settings.mode}" end + + if Plugin.settings.async_ssh + Plugin.logger.warn('Option async_ssh is deprecated, use ssh-async mode instead.') + + case Plugin.settings.mode + when :ssh + Plugin.logger.warn('Deprecated option async_ssh used together with ssh mode, switching mode to ssh-async.') + Plugin.settings.mode = :'ssh-async' + when :'async-ssh' + # This is a noop + else + Plugin.logger.warn('Deprecated option async_ssh used together with incompatible mode, ignoring.') + end + end end def validate_mqtt_settings! diff --git a/lib/smart_proxy_remote_execution_ssh/plugin.rb b/lib/smart_proxy_remote_execution_ssh/plugin.rb index cbf24f7..195bfb3 100644 --- a/lib/smart_proxy_remote_execution_ssh/plugin.rb +++ b/lib/smart_proxy_remote_execution_ssh/plugin.rb @@ -35,11 +35,6 @@ class Plugin < Proxy::Plugin Proxy::RemoteExecution::Ssh.validate! - # TODO: Move into more native methods when available - if settings.async_ssh - logger.warn('Option async_ssh is deprecated, use ssh-async mode instead.') - end - Proxy::Dynflow::TaskLauncherRegistry.register('ssh', Proxy::Dynflow::TaskLauncher::Batch) end From 3a2c9debf6daaf60b27753b0000df2ce41b23859 Mon Sep 17 00:00:00 2001 From: Adam Ruzicka Date: Tue, 12 Oct 2021 14:48:21 +0200 Subject: [PATCH 11/12] Fixes #33682 - Drop cancellation for now --- .../actions/pull_script.rb | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb b/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb index e2a3631..0059db4 100644 --- a/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb +++ b/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb @@ -56,11 +56,8 @@ def kill_run # Client was notified or is already running, dealing with this situation # is only supported if mqtt is available # Otherwise we have to wait it out - if input[:with_mqtt] - cleanup - payload = {} # TODO - mqtt_notify payload - end + # TODO + # if input[:with_mqtt] end suspend end From 0873377cd8d3b477915b60a3808581912187d6f9 Mon Sep 17 00:00:00 2001 From: Adam Ruzicka Date: Tue, 12 Oct 2021 14:51:51 +0200 Subject: [PATCH 12/12] Fixes #33682 - Address review comments --- lib/smart_proxy_remote_execution_ssh.rb | 1 - lib/smart_proxy_remote_execution_ssh/job_storage.rb | 1 + lib/smart_proxy_remote_execution_ssh/plugin.rb | 2 +- 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/smart_proxy_remote_execution_ssh.rb b/lib/smart_proxy_remote_execution_ssh.rb index 4f1fa85..c455a4a 100644 --- a/lib/smart_proxy_remote_execution_ssh.rb +++ b/lib/smart_proxy_remote_execution_ssh.rb @@ -2,7 +2,6 @@ require 'smart_proxy_remote_execution_ssh/version' require 'smart_proxy_remote_execution_ssh/plugin' require 'smart_proxy_remote_execution_ssh/webrick_ext' -require 'sequel' module Proxy::RemoteExecution module Ssh diff --git a/lib/smart_proxy_remote_execution_ssh/job_storage.rb b/lib/smart_proxy_remote_execution_ssh/job_storage.rb index 7503e07..b1358dd 100644 --- a/lib/smart_proxy_remote_execution_ssh/job_storage.rb +++ b/lib/smart_proxy_remote_execution_ssh/job_storage.rb @@ -1,4 +1,5 @@ # lib/job_storage.rb +require 'sequel' module Proxy::RemoteExecution::Ssh class JobStorage diff --git a/lib/smart_proxy_remote_execution_ssh/plugin.rb b/lib/smart_proxy_remote_execution_ssh/plugin.rb index 195bfb3..b363f7b 100644 --- a/lib/smart_proxy_remote_execution_ssh/plugin.rb +++ b/lib/smart_proxy_remote_execution_ssh/plugin.rb @@ -1,7 +1,7 @@ module Proxy::RemoteExecution::Ssh class Plugin < Proxy::Plugin SSH_LOG_LEVELS = %w[debug info warn error fatal].freeze - MODES = %i[ssh async-ssh pull pull-mqtt] + MODES = %i[ssh async-ssh pull pull-mqtt].freeze http_rackup_path File.expand_path("http_config.ru", File.expand_path("../", __FILE__)) https_rackup_path File.expand_path("http_config.ru", File.expand_path("../", __FILE__))