Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions lib/smart_proxy_remote_execution_ssh.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ def validate!
raise "Ssh public key file #{public_key_file} doesn't exist"
end

validate_mode!
validate_ssh_log_level!
validate_mqtt_settings!
end

def private_key_file
Expand All @@ -31,6 +33,35 @@ def public_key_file
File.expand_path("#{private_key_file}.pub")
end

def validate_mode!
Plugin.settings.mode = Plugin.settings.mode.to_sym

unless Plugin::MODES.include? Plugin.settings.mode
raise "Mode has to be one of #{Plugin::MODES.join(', ')}, given #{Plugin.settings.mode}"
end

if Plugin.settings.async_ssh
Plugin.logger.warn('Option async_ssh is deprecated, use ssh-async mode instead.')

case Plugin.settings.mode
when :ssh
Plugin.logger.warn('Deprecated option async_ssh used together with ssh mode, switching mode to ssh-async.')
Plugin.settings.mode = :'ssh-async'
when :'async-ssh'
# This is a noop
else
Plugin.logger.warn('Deprecated option async_ssh used together with incompatible mode, ignoring.')
end
end
end

def validate_mqtt_settings!
return unless Plugin.settings.mode == :'pull-mqtt'

raise 'mqtt_broker has to be set when pull-mqtt mode is used' if Plugin.settings.mqtt_broker.nil?
raise 'mqtt_port has to be set when pull-mqtt mode is used' if Plugin.settings.mqtt_port.nil?
end

def validate_ssh_log_level!
wanted_level = Plugin.settings.ssh_log_level.to_s
levels = Plugin::SSH_LOG_LEVELS
Expand All @@ -51,6 +82,10 @@ def validate_ssh_log_level!

Plugin.settings.ssh_log_level = Plugin.settings.ssh_log_level.to_sym
end

def job_storage
@job_storage ||= Proxy::RemoteExecution::Ssh::JobStorage.new
end
end
end
end
6 changes: 6 additions & 0 deletions lib/smart_proxy_remote_execution_ssh/actions.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
module Proxy::RemoteExecution::Ssh
module Actions
require 'smart_proxy_remote_execution_ssh/actions/run_script'
require 'smart_proxy_remote_execution_ssh/actions/pull_script'
end
end
110 changes: 110 additions & 0 deletions lib/smart_proxy_remote_execution_ssh/actions/pull_script.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
require 'mqtt'
require 'json'

module Proxy::RemoteExecution::Ssh::Actions
class PullScript < Proxy::Dynflow::Action::Runner
JobDelivered = Class.new

execution_plan_hooks.use :cleanup, :on => :stopped

def plan(action_input, mqtt: false)
super(action_input)
input[:with_mqtt] = mqtt
end

def run(event = nil)
if event == JobDelivered
output[:state] = :delivered
suspend
else
super
end
end

def init_run
otp_password = if input[:with_mqtt]
::Proxy::Dynflow::OtpManager.generate_otp(execution_plan_id)
end
input[:job_uuid] = job_storage.store_job(host_name, execution_plan_id, run_step_id, input[:script])
output[:state] = :ready_for_pickup
output[:result] = []
mqtt_start(otp_password) if input[:with_mqtt]
suspend
end

def cleanup(_plan = nil)
job_storage.drop_job(execution_plan_id, run_step_id)
Proxy::Dynflow::OtpManager.passwords.delete(execution_plan_id)
end

def process_external_event(event)
output[:state] = :running
data = event.data
continuous_output = Proxy::Dynflow::ContinuousOutput.new
Array(data['output']).each { |line| continuous_output.add_output(line, 'stdout') } if data.key?('output')
exit_code = data['exit_code'].to_i if data['exit_code']
process_update(Proxy::Dynflow::Runner::Update.new(continuous_output, exit_code))
end

def kill_run
case output[:state]
when :ready_for_pickup
# If the job is not running yet on the client, wipe it from storage
cleanup
# TODO: Stop the action
when :notified, :running
# Client was notified or is already running, dealing with this situation
# is only supported if mqtt is available
# Otherwise we have to wait it out
# TODO
# if input[:with_mqtt]
end
suspend
end

def mqtt_start(otp_password)
payload = {
type: 'data',
message_id: SecureRandom.uuid,
version: 1,
sent: DateTime.now.iso8601,
directive: 'foreman',
metadata: {
'job_uuid': input[:job_uuid],
'username': execution_plan_id,
'password': otp_password,
'return_url': "#{input[:proxy_url]}/ssh/jobs/#{input[:job_uuid]}/update",
},
content: "#{input[:proxy_url]}/ssh/jobs/#{input[:job_uuid]}",
}
mqtt_notify payload
output[:state] = :notified
end

def mqtt_notify(payload)
MQTT::Client.connect(settings.mqtt_broker, settings.mqtt_port) do |c|
c.publish(mqtt_topic, JSON.dump(payload), false, 1)
end
end

def host_name
alternative_names = input.fetch(:alternative_names, {})

alternative_names[:consumer_uuid] ||
alternative_names[:fqdn] ||
input[:hostname]
end

def mqtt_topic
"yggdrasil/#{host_name}/data/in"
end

def settings
Proxy::RemoteExecution::Ssh::Plugin.settings
end

def job_storage
Proxy::RemoteExecution::Ssh.job_storage
end
end
end
15 changes: 14 additions & 1 deletion lib/smart_proxy_remote_execution_ssh/actions/run_script.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,20 @@

module Proxy::RemoteExecution::Ssh
module Actions
class RunScript < Proxy::Dynflow::Action::Runner
class RunScript < ::Dynflow::Action
def plan(*args)
mode = Proxy::RemoteExecution::Ssh::Plugin.settings.mode
case mode
when :ssh, :'ssh-async'
plan_action(ScriptRunner, *args)
when :pull, :'pull-mqtt'
plan_action(PullScript, *args,
mqtt: mode == :'pull-mqtt')
end
end
end

class ScriptRunner < Proxy::Dynflow::Action::Runner
def initiate_runner
additional_options = {
:step_id => run_step_id,
Expand Down
49 changes: 49 additions & 0 deletions lib/smart_proxy_remote_execution_ssh/api.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
require 'net/ssh'
require 'base64'
require 'smart_proxy_dynflow/runner'

module Proxy::RemoteExecution
module Ssh

class Api < ::Sinatra::Base
include Sinatra::Authorization::Helpers
include Proxy::Dynflow::Helpers

get "/pubkey" do
File.read(Ssh.public_key_file)
Expand Down Expand Up @@ -37,6 +39,53 @@ class Api < ::Sinatra::Base
end
204
end

# Payload is a hash where
# exit_code: Integer | NilClass
# output: String
post '/jobs/:job_uuid/update' do |job_uuid|
do_authorize_with_ssl_client

with_authorized_job(job_uuid) do |job_record|
data = MultiJson.load(request.body.read)
notify_job(job_record, ::Proxy::Dynflow::Runner::ExternalEvent.new(data))
end
end

get '/jobs' do
do_authorize_with_ssl_client

MultiJson.dump(Proxy::RemoteExecution::Ssh.job_storage.job_uuids_for_host(https_cert_cn))
end

get "/jobs/:job_uuid" do |job_uuid|
do_authorize_with_ssl_client

with_authorized_job(job_uuid) do |job_record|
notify_job(job_record, Actions::PullScript::JobDelivered)
job_record[:job]
end
end

private

def notify_job(job_record, event)
world.event(job_record[:execution_plan_uuid], job_record[:run_step_id], event)
end

def with_authorized_job(uuid)
if (job = authorized_job(uuid))
yield job
else
halt 404
end
end

def authorized_job(uuid)
job_record = Proxy::RemoteExecution::Ssh.job_storage.find_job(uuid) || {}
return job_record if authorize_with_token(clear: false, task_id: job_record[:execution_plan_uuid]) ||
job_record[:hostname] == https_cert_cn
end
end
end
end
51 changes: 51 additions & 0 deletions lib/smart_proxy_remote_execution_ssh/job_storage.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# lib/job_storage.rb
require 'sequel'

module Proxy::RemoteExecution::Ssh
class JobStorage
def initialize
@db = Sequel.sqlite
@db.create_table :jobs do
DateTime :timestamp, null: false, default: Sequel::CURRENT_TIMESTAMP
String :uuid, fixed: true, size: 36, primary_key: true, null: false
String :hostname, null: false, index: true
String :execution_plan_uuid, fixed: true, size: 36, null: false, index: true
Integer :run_step_id, null: false
String :job, text: true
end
end

def find_job(uuid)
jobs.where(uuid: uuid).first
end

def job_uuids_for_host(hostname)
jobs_for_host(hostname).order(:timestamp)
.select_map(:uuid)
end

def store_job(hostname, execution_plan_uuid, run_step_id, job, uuid: SecureRandom.uuid, timestamp: Time.now.utc)
jobs.insert(timestamp: timestamp,
uuid: uuid,
hostname: hostname,
execution_plan_uuid: execution_plan_uuid,
run_step_id: run_step_id,
job: job)
uuid
end

def drop_job(execution_plan_uuid, run_step_id)
jobs.where(execution_plan_uuid: execution_plan_uuid, run_step_id: run_step_id).delete
end

private

def jobs_for_host(hostname)
jobs.where(hostname: hostname)
end

def jobs
@db[:jobs]
end
end
end
12 changes: 8 additions & 4 deletions lib/smart_proxy_remote_execution_ssh/plugin.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module Proxy::RemoteExecution::Ssh
class Plugin < Proxy::Plugin
SSH_LOG_LEVELS = %w[debug info warn error fatal].freeze
MODES = %i[ssh async-ssh pull pull-mqtt].freeze

http_rackup_path File.expand_path("http_config.ru", File.expand_path("../", __FILE__))
https_rackup_path File.expand_path("http_config.ru", File.expand_path("../", __FILE__))
Expand All @@ -11,23 +12,26 @@ class Plugin < Proxy::Plugin
:remote_working_dir => '/var/tmp',
:local_working_dir => '/var/tmp',
:kerberos_auth => false,
:async_ssh => false,
# When set to nil, makes REX use the runner's default interval
# :runner_refresh_interval => nil,
:ssh_log_level => :fatal,
:cleanup_working_dirs => true
:cleanup_working_dirs => true,
# :mqtt_broker => nil,
# :mqtt_port => nil,
:mode => :ssh

plugin :ssh, Proxy::RemoteExecution::Ssh::VERSION
after_activation do
require 'smart_proxy_dynflow'
require 'smart_proxy_remote_execution_ssh/version'
require 'smart_proxy_remote_execution_ssh/cockpit'
require 'smart_proxy_remote_execution_ssh/api'
require 'smart_proxy_remote_execution_ssh/actions/run_script'
require 'smart_proxy_remote_execution_ssh/actions'
require 'smart_proxy_remote_execution_ssh/dispatcher'
require 'smart_proxy_remote_execution_ssh/log_filter'
require 'smart_proxy_remote_execution_ssh/runners'
require 'smart_proxy_remote_execution_ssh/utils'
require 'smart_proxy_remote_execution_ssh/job_storage'

Proxy::RemoteExecution::Ssh.validate!

Expand All @@ -41,7 +45,7 @@ def self.simulate?
def self.runner_class
@runner_class ||= if simulate?
Runners::FakeScriptRunner
elsif settings[:async_ssh]
elsif settings.mode == :'ssh-async'
Runners::PollingScriptRunner
else
Runners::ScriptRunner
Expand Down
8 changes: 7 additions & 1 deletion settings.d/remote_execution_ssh.yml.example
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
:local_working_dir: '/var/tmp'
:remote_working_dir: '/var/tmp'
# :kerberos_auth: false
# :async_ssh: false

# Mode of operation, one of ssh, ssh-async, pull, pull-mqtt
:mode: ssh

# Defines how often (in seconds) should the runner check
# for new data leave empty to use the runner's default
Expand All @@ -18,3 +20,7 @@

# Remove working directories on job completion
# :cleanup_working_dirs: true

# MQTT configuration, need to be set if mode is set to pull-mqtt
# :mqtt_broker: localhost
# :mqtt_port: 1883
1 change: 1 addition & 0 deletions smart_proxy_remote_execution_ssh.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ Gem::Specification.new do |gem|

gem.add_runtime_dependency('smart_proxy_dynflow', '~> 0.5')
gem.add_runtime_dependency('net-ssh', '>= 4.2.0')
gem.add_runtime_dependency('mqtt')
end
Loading