diff --git a/lib/smart_proxy_remote_execution_ssh.rb b/lib/smart_proxy_remote_execution_ssh.rb index e430863..c455a4a 100644 --- a/lib/smart_proxy_remote_execution_ssh.rb +++ b/lib/smart_proxy_remote_execution_ssh.rb @@ -20,7 +20,9 @@ def validate! raise "Ssh public key file #{public_key_file} doesn't exist" end + validate_mode! validate_ssh_log_level! + validate_mqtt_settings! end def private_key_file @@ -31,6 +33,35 @@ def public_key_file File.expand_path("#{private_key_file}.pub") end + def validate_mode! + Plugin.settings.mode = Plugin.settings.mode.to_sym + + unless Plugin::MODES.include? Plugin.settings.mode + raise "Mode has to be one of #{Plugin::MODES.join(', ')}, given #{Plugin.settings.mode}" + end + + if Plugin.settings.async_ssh + Plugin.logger.warn('Option async_ssh is deprecated, use ssh-async mode instead.') + + case Plugin.settings.mode + when :ssh + Plugin.logger.warn('Deprecated option async_ssh used together with ssh mode, switching mode to ssh-async.') + Plugin.settings.mode = :'ssh-async' + when :'async-ssh' + # This is a noop + else + Plugin.logger.warn('Deprecated option async_ssh used together with incompatible mode, ignoring.') + end + end + end + + def validate_mqtt_settings! + return unless Plugin.settings.mode == :'pull-mqtt' + + raise 'mqtt_broker has to be set when pull-mqtt mode is used' if Plugin.settings.mqtt_broker.nil? + raise 'mqtt_port has to be set when pull-mqtt mode is used' if Plugin.settings.mqtt_port.nil? + end + def validate_ssh_log_level! wanted_level = Plugin.settings.ssh_log_level.to_s levels = Plugin::SSH_LOG_LEVELS @@ -51,6 +82,10 @@ def validate_ssh_log_level! Plugin.settings.ssh_log_level = Plugin.settings.ssh_log_level.to_sym end + + def job_storage + @job_storage ||= Proxy::RemoteExecution::Ssh::JobStorage.new + end end end end diff --git a/lib/smart_proxy_remote_execution_ssh/actions.rb b/lib/smart_proxy_remote_execution_ssh/actions.rb new file mode 100644 index 0000000..1ec6cd6 --- /dev/null +++ b/lib/smart_proxy_remote_execution_ssh/actions.rb @@ -0,0 +1,6 @@ +module Proxy::RemoteExecution::Ssh + module Actions + require 'smart_proxy_remote_execution_ssh/actions/run_script' + require 'smart_proxy_remote_execution_ssh/actions/pull_script' + end +end diff --git a/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb b/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb new file mode 100644 index 0000000..0059db4 --- /dev/null +++ b/lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb @@ -0,0 +1,110 @@ +require 'mqtt' +require 'json' + +module Proxy::RemoteExecution::Ssh::Actions + class PullScript < Proxy::Dynflow::Action::Runner + JobDelivered = Class.new + + execution_plan_hooks.use :cleanup, :on => :stopped + + def plan(action_input, mqtt: false) + super(action_input) + input[:with_mqtt] = mqtt + end + + def run(event = nil) + if event == JobDelivered + output[:state] = :delivered + suspend + else + super + end + end + + def init_run + otp_password = if input[:with_mqtt] + ::Proxy::Dynflow::OtpManager.generate_otp(execution_plan_id) + end + input[:job_uuid] = job_storage.store_job(host_name, execution_plan_id, run_step_id, input[:script]) + output[:state] = :ready_for_pickup + output[:result] = [] + mqtt_start(otp_password) if input[:with_mqtt] + suspend + end + + def cleanup(_plan = nil) + job_storage.drop_job(execution_plan_id, run_step_id) + Proxy::Dynflow::OtpManager.passwords.delete(execution_plan_id) + end + + def process_external_event(event) + output[:state] = :running + data = event.data + continuous_output = Proxy::Dynflow::ContinuousOutput.new + Array(data['output']).each { |line| continuous_output.add_output(line, 'stdout') } if data.key?('output') + exit_code = data['exit_code'].to_i if data['exit_code'] + process_update(Proxy::Dynflow::Runner::Update.new(continuous_output, exit_code)) + end + + def kill_run + case output[:state] + when :ready_for_pickup + # If the job is not running yet on the client, wipe it from storage + cleanup + # TODO: Stop the action + when :notified, :running + # Client was notified or is already running, dealing with this situation + # is only supported if mqtt is available + # Otherwise we have to wait it out + # TODO + # if input[:with_mqtt] + end + suspend + end + + def mqtt_start(otp_password) + payload = { + type: 'data', + message_id: SecureRandom.uuid, + version: 1, + sent: DateTime.now.iso8601, + directive: 'foreman', + metadata: { + 'job_uuid': input[:job_uuid], + 'username': execution_plan_id, + 'password': otp_password, + 'return_url': "#{input[:proxy_url]}/ssh/jobs/#{input[:job_uuid]}/update", + }, + content: "#{input[:proxy_url]}/ssh/jobs/#{input[:job_uuid]}", + } + mqtt_notify payload + output[:state] = :notified + end + + def mqtt_notify(payload) + MQTT::Client.connect(settings.mqtt_broker, settings.mqtt_port) do |c| + c.publish(mqtt_topic, JSON.dump(payload), false, 1) + end + end + + def host_name + alternative_names = input.fetch(:alternative_names, {}) + + alternative_names[:consumer_uuid] || + alternative_names[:fqdn] || + input[:hostname] + end + + def mqtt_topic + "yggdrasil/#{host_name}/data/in" + end + + def settings + Proxy::RemoteExecution::Ssh::Plugin.settings + end + + def job_storage + Proxy::RemoteExecution::Ssh.job_storage + end + end +end diff --git a/lib/smart_proxy_remote_execution_ssh/actions/run_script.rb b/lib/smart_proxy_remote_execution_ssh/actions/run_script.rb index 6de585a..45448c2 100644 --- a/lib/smart_proxy_remote_execution_ssh/actions/run_script.rb +++ b/lib/smart_proxy_remote_execution_ssh/actions/run_script.rb @@ -3,7 +3,20 @@ module Proxy::RemoteExecution::Ssh module Actions - class RunScript < Proxy::Dynflow::Action::Runner + class RunScript < ::Dynflow::Action + def plan(*args) + mode = Proxy::RemoteExecution::Ssh::Plugin.settings.mode + case mode + when :ssh, :'ssh-async' + plan_action(ScriptRunner, *args) + when :pull, :'pull-mqtt' + plan_action(PullScript, *args, + mqtt: mode == :'pull-mqtt') + end + end + end + + class ScriptRunner < Proxy::Dynflow::Action::Runner def initiate_runner additional_options = { :step_id => run_step_id, diff --git a/lib/smart_proxy_remote_execution_ssh/api.rb b/lib/smart_proxy_remote_execution_ssh/api.rb index 52f82bc..7af69b4 100644 --- a/lib/smart_proxy_remote_execution_ssh/api.rb +++ b/lib/smart_proxy_remote_execution_ssh/api.rb @@ -1,11 +1,13 @@ require 'net/ssh' require 'base64' +require 'smart_proxy_dynflow/runner' module Proxy::RemoteExecution module Ssh class Api < ::Sinatra::Base include Sinatra::Authorization::Helpers + include Proxy::Dynflow::Helpers get "/pubkey" do File.read(Ssh.public_key_file) @@ -37,6 +39,53 @@ class Api < ::Sinatra::Base end 204 end + + # Payload is a hash where + # exit_code: Integer | NilClass + # output: String + post '/jobs/:job_uuid/update' do |job_uuid| + do_authorize_with_ssl_client + + with_authorized_job(job_uuid) do |job_record| + data = MultiJson.load(request.body.read) + notify_job(job_record, ::Proxy::Dynflow::Runner::ExternalEvent.new(data)) + end + end + + get '/jobs' do + do_authorize_with_ssl_client + + MultiJson.dump(Proxy::RemoteExecution::Ssh.job_storage.job_uuids_for_host(https_cert_cn)) + end + + get "/jobs/:job_uuid" do |job_uuid| + do_authorize_with_ssl_client + + with_authorized_job(job_uuid) do |job_record| + notify_job(job_record, Actions::PullScript::JobDelivered) + job_record[:job] + end + end + + private + + def notify_job(job_record, event) + world.event(job_record[:execution_plan_uuid], job_record[:run_step_id], event) + end + + def with_authorized_job(uuid) + if (job = authorized_job(uuid)) + yield job + else + halt 404 + end + end + + def authorized_job(uuid) + job_record = Proxy::RemoteExecution::Ssh.job_storage.find_job(uuid) || {} + return job_record if authorize_with_token(clear: false, task_id: job_record[:execution_plan_uuid]) || + job_record[:hostname] == https_cert_cn + end end end end diff --git a/lib/smart_proxy_remote_execution_ssh/job_storage.rb b/lib/smart_proxy_remote_execution_ssh/job_storage.rb new file mode 100644 index 0000000..b1358dd --- /dev/null +++ b/lib/smart_proxy_remote_execution_ssh/job_storage.rb @@ -0,0 +1,51 @@ +# lib/job_storage.rb +require 'sequel' + +module Proxy::RemoteExecution::Ssh + class JobStorage + def initialize + @db = Sequel.sqlite + @db.create_table :jobs do + DateTime :timestamp, null: false, default: Sequel::CURRENT_TIMESTAMP + String :uuid, fixed: true, size: 36, primary_key: true, null: false + String :hostname, null: false, index: true + String :execution_plan_uuid, fixed: true, size: 36, null: false, index: true + Integer :run_step_id, null: false + String :job, text: true + end + end + + def find_job(uuid) + jobs.where(uuid: uuid).first + end + + def job_uuids_for_host(hostname) + jobs_for_host(hostname).order(:timestamp) + .select_map(:uuid) + end + + def store_job(hostname, execution_plan_uuid, run_step_id, job, uuid: SecureRandom.uuid, timestamp: Time.now.utc) + jobs.insert(timestamp: timestamp, + uuid: uuid, + hostname: hostname, + execution_plan_uuid: execution_plan_uuid, + run_step_id: run_step_id, + job: job) + uuid + end + + def drop_job(execution_plan_uuid, run_step_id) + jobs.where(execution_plan_uuid: execution_plan_uuid, run_step_id: run_step_id).delete + end + + private + + def jobs_for_host(hostname) + jobs.where(hostname: hostname) + end + + def jobs + @db[:jobs] + end + end +end diff --git a/lib/smart_proxy_remote_execution_ssh/plugin.rb b/lib/smart_proxy_remote_execution_ssh/plugin.rb index 7e5d182..b363f7b 100644 --- a/lib/smart_proxy_remote_execution_ssh/plugin.rb +++ b/lib/smart_proxy_remote_execution_ssh/plugin.rb @@ -1,6 +1,7 @@ module Proxy::RemoteExecution::Ssh class Plugin < Proxy::Plugin SSH_LOG_LEVELS = %w[debug info warn error fatal].freeze + MODES = %i[ssh async-ssh pull pull-mqtt].freeze http_rackup_path File.expand_path("http_config.ru", File.expand_path("../", __FILE__)) https_rackup_path File.expand_path("http_config.ru", File.expand_path("../", __FILE__)) @@ -11,11 +12,13 @@ class Plugin < Proxy::Plugin :remote_working_dir => '/var/tmp', :local_working_dir => '/var/tmp', :kerberos_auth => false, - :async_ssh => false, # When set to nil, makes REX use the runner's default interval # :runner_refresh_interval => nil, :ssh_log_level => :fatal, - :cleanup_working_dirs => true + :cleanup_working_dirs => true, + # :mqtt_broker => nil, + # :mqtt_port => nil, + :mode => :ssh plugin :ssh, Proxy::RemoteExecution::Ssh::VERSION after_activation do @@ -23,11 +26,12 @@ class Plugin < Proxy::Plugin require 'smart_proxy_remote_execution_ssh/version' require 'smart_proxy_remote_execution_ssh/cockpit' require 'smart_proxy_remote_execution_ssh/api' - require 'smart_proxy_remote_execution_ssh/actions/run_script' + require 'smart_proxy_remote_execution_ssh/actions' require 'smart_proxy_remote_execution_ssh/dispatcher' require 'smart_proxy_remote_execution_ssh/log_filter' require 'smart_proxy_remote_execution_ssh/runners' require 'smart_proxy_remote_execution_ssh/utils' + require 'smart_proxy_remote_execution_ssh/job_storage' Proxy::RemoteExecution::Ssh.validate! @@ -41,7 +45,7 @@ def self.simulate? def self.runner_class @runner_class ||= if simulate? Runners::FakeScriptRunner - elsif settings[:async_ssh] + elsif settings.mode == :'ssh-async' Runners::PollingScriptRunner else Runners::ScriptRunner diff --git a/settings.d/remote_execution_ssh.yml.example b/settings.d/remote_execution_ssh.yml.example index 7d0412d..d4214ad 100644 --- a/settings.d/remote_execution_ssh.yml.example +++ b/settings.d/remote_execution_ssh.yml.example @@ -4,7 +4,9 @@ :local_working_dir: '/var/tmp' :remote_working_dir: '/var/tmp' # :kerberos_auth: false -# :async_ssh: false + +# Mode of operation, one of ssh, ssh-async, pull, pull-mqtt +:mode: ssh # Defines how often (in seconds) should the runner check # for new data leave empty to use the runner's default @@ -18,3 +20,7 @@ # Remove working directories on job completion # :cleanup_working_dirs: true + +# MQTT configuration, need to be set if mode is set to pull-mqtt +# :mqtt_broker: localhost +# :mqtt_port: 1883 diff --git a/smart_proxy_remote_execution_ssh.gemspec b/smart_proxy_remote_execution_ssh.gemspec index dda2845..04947df 100644 --- a/smart_proxy_remote_execution_ssh.gemspec +++ b/smart_proxy_remote_execution_ssh.gemspec @@ -31,4 +31,5 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency('smart_proxy_dynflow', '~> 0.5') gem.add_runtime_dependency('net-ssh', '>= 4.2.0') + gem.add_runtime_dependency('mqtt') end diff --git a/test/api_test.rb b/test/api_test.rb index 81c3d6d..b39c8bf 100644 --- a/test/api_test.rb +++ b/test/api_test.rb @@ -1,5 +1,8 @@ require 'test_helper' require 'tempfile' +require 'smart_proxy_remote_execution_ssh/actions/pull_script' +require 'smart_proxy_remote_execution_ssh/job_storage' +require 'smart_proxy_dynflow/otp_manager' KNOWN_HOSTS = < 1 + _(last_response.status).must_equal 403 + end + + it 'returns 403 if wrong credentials are supplied' do + auth = Proxy::Dynflow::OtpManager.tokenize('username', 'password') + post '/jobs/12345/update', {}, 'HTTP_AUTHORIZATION' => "Basic #{auth}" + _(last_response.status).must_equal 403 + end + + it 'returns 404 if job does not exist' do + Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:https_cert_cn).returns(hostname) + post '/jobs/12345/update', {} + _(last_response.status).must_equal 404 + end + + it 'supports http basic auth' do + pass = Proxy::Dynflow::OtpManager.generate_otp(execution_plan_uuid) + auth = Proxy::Dynflow::OtpManager.tokenize(execution_plan_uuid, pass) + + fake_world = mock + fake_world.expects(:event) do |task_id, step_id, _payload| + task_id == execution_plan_uuid && step_id == run_step_id + end + Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:world).returns(fake_world) + + post "/jobs/#{uuid}/update", '{}', 'HTTP_AUTHORIZATION' => "Basic #{auth}" + _(last_response.status).must_equal 200 + + Proxy::Dynflow::OtpManager.passwords.delete(execution_plan_uuid) + end + + it 'dispatches an event' do + Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:https_cert_cn).returns(hostname) + fake_world = mock + fake_world.expects(:event) do |task_id, step_id, _payload| + task_id == execution_plan_uuid && step_id == run_step_id + end + Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:world).returns(fake_world) + + post "/jobs/#{uuid}/update", '{}' + _(last_response.status).must_equal 200 + end + end + + describe '/jobs/:job_uuid' do + it 'returns 403 if HTTPS is used and no cert is provided' do + get '/jobs/12345', {}, 'HTTPS' => 1 + _(last_response.status).must_equal 403 + end + + it 'returns 403 when wrong credentials are supplied' do + auth = Proxy::Dynflow::OtpManager.tokenize('username', 'password') + get '/jobs/12345', {}, 'HTTP_AUTHORIZATION' => "Basic #{auth}" + _(last_response.status).must_equal 403 + end + + it 'returns content if there is some and notifies the action when using password' do + pass = Proxy::Dynflow::OtpManager.generate_otp(execution_plan_uuid) + auth = Proxy::Dynflow::OtpManager.tokenize(execution_plan_uuid, pass) + + fake_world = mock + fake_world.expects(:event).with(execution_plan_uuid, run_step_id, Actions::PullScript::JobDelivered) + Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:world).returns(fake_world) + + get "/jobs/#{uuid}", {}, 'HTTP_AUTHORIZATION' => "Basic #{auth}" + _(last_response.status).must_equal 200 + _(last_response.body).must_equal content + + Proxy::Dynflow::OtpManager.passwords.delete(execution_plan_uuid) + end + + it 'returns content if there is some and notifies the action' do + Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:https_cert_cn).returns(hostname) + fake_world = mock + fake_world.expects(:event).with(execution_plan_uuid, run_step_id, Actions::PullScript::JobDelivered) + Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:world).returns(fake_world) + + get "/jobs/#{uuid}" + _(last_response.status).must_equal 200 + _(last_response.body).must_equal content + end + + it 'returns 404 if there is no content' do + Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:https_cert_cn).returns(hostname) + + get '/jobs/12345' + _(last_response.status).must_equal 404 + end + end + + describe '/jobs' do + it 'returns 403 if HTTPS is used and no cert is provided' do + get '/jobs', {}, 'HTTPS' => 1 + _(last_response.status).must_equal 403 + end + + it 'returns a list of job uuids for a given host' do + Proxy::RemoteExecution::Ssh::Api.any_instance.expects(:https_cert_cn).returns(hostname) + Proxy::RemoteExecution::Ssh + .job_storage + .store_job('another.host', SecureRandom.uuid, 1, 'hello') + + get '/jobs' + _(last_response.status).must_equal 200 + data = MultiJson.load(last_response.body) + _(data).must_equal [uuid] + end + end + end end end