Skip to content

Commit

Permalink
Deeper config checking and fixes for config format
Browse files Browse the repository at this point in the history
  • Loading branch information
sodabrew committed Jan 23, 2015
1 parent 3be591a commit d09a9c1
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 17 deletions.
64 changes: 51 additions & 13 deletions code/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,29 @@ class QueueConfig < Struct.new(
:blocking_params,
:schedule
)

def to_json
{
'name' => name,
'script' => script,
'num_workers' => num_workers,
'exec_prefix' => exec_prefix,
'env_vars' => env_vars,
'coalesce_params' => coalesce_params,
'blocking_params' => blocking_params,
'schedule' => schedule.map do |s|
{
'cron' => s['cron'],
'params' => {
'param1' => s['param1'],
'param2' => s['param2'],
'param3' => s['param3'],
'param4' => s['param4'],
}
}
end
}.to_json
end
end

class Queue
Expand Down Expand Up @@ -103,8 +126,11 @@ def self.delete(name)
end

def self.create(options, config_path=nil)
# Validate the options
config = sublimate_config(options)

# Create a directories and config
queue_path = "queue/#{options['name']}"
queue_path = "queue/#{config['name']}"
FileUtils.mkdir_p(queue_path)
FileUtils.mkdir_p(queue_path + '/prep')
FileUtils.mkdir_p(queue_path + '/que')
Expand All @@ -119,10 +145,10 @@ def self.create(options, config_path=nil)
else
# Write config to dir
File.open(queue_path + '/config.json', "w") do |f|
f.write(options.to_json)
f.write(config.to_json)
end
end
RQ::Queue.start_process(options)
RQ::Queue.start_process(config)
end

def self.start_process(options)
Expand Down Expand Up @@ -357,10 +383,8 @@ def run_queue_script!(msg)
ENV["RQ_FORCE_REMOTE"] = "1" if msg['force_remote']

# Set env vars specified in queue config file
if @config.env_vars
@config.env_vars.each do |varname, value|
ENV[varname] = value unless varname.match(/^RQ_/) # Don't let the config override RQ-specific env vars though
end
@config.env_vars.each do |varname, value|
ENV[varname] = value unless varname.match(/^RQ_/) # Don't let the config override RQ-specific env vars though
end

# unset RUBYOPT so it doesn't reinitialize the client ruby's GEM_HOME, etc.
Expand Down Expand Up @@ -435,30 +459,44 @@ def load_rq_config

def load_config
data = File.read(File.join(@queue_path, 'config.json'))
@config = sublimate_config(JSON.parse(data))
@config = self.class.sublimate_config(JSON.parse(data))
end

# There are a variety of ways we used to specify truth.
# Going forward, javascript true / false are preferred.
def so_truthy? fudge
def self.so_truthy? fudge
!!([true, 'true', 'yes', '1', 1].include? fudge)
end

def sublimate_config(conf)
def self.sublimate_config(conf)
# TODO config validation
new_config = QueueConfig.new
new_config.name = conf['name']
new_config.script = conf['script']
new_config.num_workers = conf['num_workers'].to_i
new_config.exec_prefix = conf['exec_prefix']
new_config.env_vars = conf['env_vars']

if conf['env_vars'].is_a? Hash
new_config.env_vars = conf['env_vars']
else
new_config.env_vars = {}
end

if so_truthy?(conf['coalesce'])
# Convert from old style coalesce / coalesce_paramN
new_config.coalesce_params = (1..4).map{ |x| x if so_truthy?(conf["coalesce_param#{x}"]) }.compact
elsif conf['coalesce_params'].is_a? Array
new_config.coalesce_params = conf['coalesce_params'].map(&:to_i)
else
new_config.coalesce_params = []
end

if conf['blocking_params'].is_a? Array
new_config.blocking_params = conf['blocking_params'].map(&:to_i)
else
new_config.coalesce_params = conf['coalesce_params']
new_config.blocking_params = []
end
new_config.blocking_params = conf['blocking_params']

new_config.schedule = (conf['schedule'] || []).map do |s|
begin
{
Expand Down
9 changes: 5 additions & 4 deletions test/test_queues_setup.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ def test_key(que_name, under_test, key, expected)
#{"script":"./test/test_script.sh","name":"test","url":"http://127.0.0.1:3333/","num_workers":"1"}
#
test_key 'test', result, 'name', 'test'
test_key 'test', result, 'num_workers', '1'
test_key 'test', result, 'num_workers', 1
test_key 'test', result, 'script', './test/test_script.sh'
test_key 'test', result, 'coalesce', 'no'
test_key 'test', result, 'coalesce_params', []
test_key 'test', result, 'blocking_params', []
test_key 'test', result, 'exec_prefix', ''


Expand All @@ -59,8 +60,8 @@ def test_key(que_name, under_test, key, expected)
result = JSON.parse(res.body)

test_key 'test_coalesce', result, 'name', 'test_coalesce'
test_key 'test_coalesce', result, 'num_workers', '1'
test_key 'test_coalesce', result, 'num_workers', 1
test_key 'test_coalesce', result, 'script', './test/test_script.sh'
test_key 'test_coalesce', result, 'exec_prefix', ''
test_key 'test_coalesce', result, 'coalesce_params', []
test_key 'test_coalesce', result, 'coalesce_params', [1]
test_key 'test_coalesce', result, 'blocking_params', []

0 comments on commit d09a9c1

Please sign in to comment.