diff --git a/code/queue.rb b/code/queue.rb index 287e7ad..d18f04a 100644 --- a/code/queue.rb +++ b/code/queue.rb @@ -32,6 +32,29 @@ class QueueConfig < Struct.new( :blocking_params, :schedule ) + + def to_json + { + 'name' => name, + 'script' => script, + 'num_workers' => num_workers, + 'exec_prefix' => exec_prefix, + 'env_vars' => env_vars, + 'coalesce_params' => coalesce_params, + 'blocking_params' => blocking_params, + 'schedule' => schedule.map do |s| + { + 'cron' => s['cron'], + 'params' => { + 'param1' => s['param1'], + 'param2' => s['param2'], + 'param3' => s['param3'], + 'param4' => s['param4'], + } + } + end + }.to_json + end end class Queue @@ -103,8 +126,11 @@ def self.delete(name) end def self.create(options, config_path=nil) + # Validate the options + config = sublimate_config(options) + # Create a directories and config - queue_path = "queue/#{options['name']}" + queue_path = "queue/#{config['name']}" FileUtils.mkdir_p(queue_path) FileUtils.mkdir_p(queue_path + '/prep') FileUtils.mkdir_p(queue_path + '/que') @@ -119,10 +145,10 @@ def self.create(options, config_path=nil) else # Write config to dir File.open(queue_path + '/config.json', "w") do |f| - f.write(options.to_json) + f.write(config.to_json) end end - RQ::Queue.start_process(options) + RQ::Queue.start_process(config) end def self.start_process(options) @@ -357,10 +383,8 @@ def run_queue_script!(msg) ENV["RQ_FORCE_REMOTE"] = "1" if msg['force_remote'] # Set env vars specified in queue config file - if @config.env_vars - @config.env_vars.each do |varname, value| - ENV[varname] = value unless varname.match(/^RQ_/) # Don't let the config override RQ-specific env vars though - end + @config.env_vars.each do |varname, value| + ENV[varname] = value unless varname.match(/^RQ_/) # Don't let the config override RQ-specific env vars though end # unset RUBYOPT so it doesn't reinitialize the client ruby's GEM_HOME, etc. @@ -435,30 +459,44 @@ def load_rq_config def load_config data = File.read(File.join(@queue_path, 'config.json')) - @config = sublimate_config(JSON.parse(data)) + @config = self.class.sublimate_config(JSON.parse(data)) end # There are a variety of ways we used to specify truth. # Going forward, javascript true / false are preferred. - def so_truthy? fudge + def self.so_truthy? fudge !!([true, 'true', 'yes', '1', 1].include? fudge) end - def sublimate_config(conf) + def self.sublimate_config(conf) # TODO config validation new_config = QueueConfig.new new_config.name = conf['name'] new_config.script = conf['script'] new_config.num_workers = conf['num_workers'].to_i new_config.exec_prefix = conf['exec_prefix'] - new_config.env_vars = conf['env_vars'] + + if conf['env_vars'].is_a? Hash + new_config.env_vars = conf['env_vars'] + else + new_config.env_vars = {} + end + if so_truthy?(conf['coalesce']) # Convert from old style coalesce / coalesce_paramN new_config.coalesce_params = (1..4).map{ |x| x if so_truthy?(conf["coalesce_param#{x}"]) }.compact + elsif conf['coalesce_params'].is_a? Array + new_config.coalesce_params = conf['coalesce_params'].map(&:to_i) + else + new_config.coalesce_params = [] + end + + if conf['blocking_params'].is_a? Array + new_config.blocking_params = conf['blocking_params'].map(&:to_i) else - new_config.coalesce_params = conf['coalesce_params'] + new_config.blocking_params = [] end - new_config.blocking_params = conf['blocking_params'] + new_config.schedule = (conf['schedule'] || []).map do |s| begin { diff --git a/test/test_queues_setup.rb b/test/test_queues_setup.rb index 8d2e265..eb78936 100755 --- a/test/test_queues_setup.rb +++ b/test/test_queues_setup.rb @@ -40,9 +40,10 @@ def test_key(que_name, under_test, key, expected) #{"script":"./test/test_script.sh","name":"test","url":"http://127.0.0.1:3333/","num_workers":"1"} # test_key 'test', result, 'name', 'test' -test_key 'test', result, 'num_workers', '1' +test_key 'test', result, 'num_workers', 1 test_key 'test', result, 'script', './test/test_script.sh' -test_key 'test', result, 'coalesce', 'no' +test_key 'test', result, 'coalesce_params', [] +test_key 'test', result, 'blocking_params', [] test_key 'test', result, 'exec_prefix', '' @@ -59,8 +60,8 @@ def test_key(que_name, under_test, key, expected) result = JSON.parse(res.body) test_key 'test_coalesce', result, 'name', 'test_coalesce' -test_key 'test_coalesce', result, 'num_workers', '1' +test_key 'test_coalesce', result, 'num_workers', 1 test_key 'test_coalesce', result, 'script', './test/test_script.sh' test_key 'test_coalesce', result, 'exec_prefix', '' -test_key 'test_coalesce', result, 'coalesce_params', [] +test_key 'test_coalesce', result, 'coalesce_params', [1] test_key 'test_coalesce', result, 'blocking_params', []