From 07a0164d8f801fbb5b07a7189388f1eb5e6a1e93 Mon Sep 17 00:00:00 2001 From: Aaron Stone Date: Mon, 29 Jul 2013 22:14:33 -0700 Subject: [PATCH] Style and whitespace --- code/hashdir.rb | 35 ++++++++++-------------- code/htmlutils.rb | 4 +-- code/message.rb | 5 +--- code/queue.rb | 57 +++++++++++++-------------------------- code/queuemgr.rb | 20 +++++++------- code/rq.rb | 18 +++++-------- code/rule_processor.rb | 9 +++---- code/test/env_var_test.rb | 3 +-- code/test/test_script.rb | 5 +--- code/unixrack.rb | 6 ++--- 10 files changed, 57 insertions(+), 105 deletions(-) diff --git a/code/hashdir.rb b/code/hashdir.rb index c2baf23..528804b 100644 --- a/code/hashdir.rb +++ b/code/hashdir.rb @@ -28,35 +28,31 @@ def self.entries_int(path, level, accum, limit = nil) if level == 0 # YYYYMMDD ents1 = Dir.glob("#{path}/[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]") - ents1.sort.reverse.each { - |e| + ents1.sort.reverse.each do |e| self.entries_int(e, 1, accum, limit) break if limit && accum.length == limit - } + end elsif level == 1 # HH ents1 = Dir.glob("#{path}/[0-9][0-9]") - ents1.sort.reverse.each { - |e| + ents1.sort.reverse.each do |e| self.entries_int(e, 2, accum, limit) break if limit && accum.length == limit - } + end elsif level == 2 # MM ents1 = Dir.glob("#{path}/[0-9][0-9]") - ents1.sort.reverse.each { - |e| + ents1.sort.reverse.each do |e| self.entries_int(e, 3, accum, limit) break if limit && accum.length == limit - } + end elsif level == 3 # MESG-ID ents1 = Dir.glob("#{path}/[0-9][0-9]*") - ents1.sort.reverse.each { - |e| + ents1.sort.reverse.each do |e| accum << e.split('/').last break if limit && accum.length == limit - } + end end accum end @@ -77,24 +73,21 @@ def self.num_entries_int(path, level) if level == 0 # YYYYMMDD ents1 = Dir.glob("#{path}/[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]") - ents1.sort.reverse.each { - |e| + ents1.sort.reverse.each do |e| sum += self.num_entries_int(e, 1) - } + end elsif level == 1 # HH ents1 = Dir.glob("#{path}/[0-9][0-9]") - ents1.sort.reverse.each { - |e| + ents1.sort.reverse.each do |e| sum += self.num_entries_int(e, 2) - } + end elsif level == 2 # MM ents1 = Dir.glob("#{path}/[0-9][0-9]") - ents1.sort.reverse.each { - |e| + ents1.sort.reverse.each do |e| sum += self.num_entries_int(e, 3) - } + end elsif level == 3 # MESG-ID ents1 = Dir.glob("#{path}/[0-9][0-9]*") diff --git a/code/htmlutils.rb b/code/htmlutils.rb index 319c1fa..d8c446c 100644 --- a/code/htmlutils.rb +++ b/code/htmlutils.rb @@ -66,9 +66,7 @@ def process_chunk(text) nums = codes.split(';') - nums.each do - |num_str| - + nums.each do |num_str| num = num_str.to_i if num == 0 diff --git a/code/message.rb b/code/message.rb index 8fcf220..24e8fc6 100644 --- a/code/message.rb +++ b/code/message.rb @@ -1,11 +1,10 @@ - require 'socket' require 'json' module RQ class Message < Struct.new(:msg_id, :status, :dest, :src, :param1, :param2, :param3, :param4) - def initialize(options = { }}) + def initialize(options={}) end def init_with_opts(options) @@ -21,5 +20,3 @@ def init_with_opts(options) end end - - diff --git a/code/queue.rb b/code/queue.rb index 1e9cc9b..ff6542d 100644 --- a/code/queue.rb +++ b/code/queue.rb @@ -91,8 +91,7 @@ def self.create(options,config_path=nil) File.symlink(old_path, queue_path + '/config.json') else # Write config to dir - File.open(queue_path + '/config.json', "w") do - |f| + File.open(queue_path + '/config.json', "w") do |f| f.write(options.to_json) end end @@ -100,8 +99,7 @@ def self.create(options,config_path=nil) end def self.log(path, mesg) - File.open(path + '/queue.log', "a") do - |f| + File.open(path + '/queue.log', "a") do |f| f.write("#{Process.pid} - #{Time.now} - #{mesg}\n") end end @@ -356,8 +354,7 @@ def run_queue_script!(msg) def init_socket # Show pid File.unlink(@queue_path + '/queue.pid') rescue nil - File.open(@queue_path + '/queue.pid', "w") do - |f| + File.open(@queue_path + '/queue.pid', "w") do |f| f.write("#{Process.pid}\n") end @@ -447,8 +444,7 @@ def check_msg(msg, input) # Copy only these keys from input message keys = %w(src param1 param2 param3 param4 post_run_webhook due force_remote) - keys.each do - |key| + keys.each do |key| next unless input.has_key?(key) msg[key] = input[key] end @@ -521,8 +517,7 @@ def is_duplicate?(msg1, msg2) # Put all of its dups into the done state def handle_dups_done(msg, new_state) if msg['dups'] - msg['dups'].each { - |i| + msg['dups'].each do |i| h = @temp_que_dups.delete(i) new_status = "duplicate #{gen_full_msg_id(msg)}" write_msg_status(i, new_status, 'que') @@ -532,7 +527,7 @@ def handle_dups_done(msg, new_state) # TODO: refactor this basename = "#{@queue_path}/que/#{i}" RQ::HashDir.inject(basename, "#{@queue_path}/#{new_state}", i) - } + end msg['dups'] = msg['dups'].map { |i| gen_full_msg_id({'msg_id' => i}) } end end @@ -540,12 +535,11 @@ def handle_dups_done(msg, new_state) # Handle a message that doesn't succeed def handle_dups_fail(msg) if msg['dups'] - msg['dups'].each { - |i| + msg['dups'].each do |i| h = @temp_que_dups.delete(i) h.delete('dup') @que.unshift(h) - } + end msg.delete('dups') end end @@ -584,8 +578,7 @@ def copy_and_clean_msg(input, new_dest = nil) # Copy only these keys from input message keys = %w(src param1 param2 param3 param4 post_run_webhook due) - keys.each do - |key| + keys.each do |key| next unless input.has_key?(key) msg[key] = input[key] end @@ -726,8 +719,7 @@ def clone_msg(msg) new_attach_path = new_basename + '/attach/' Dir.mkdir(new_attach_path) - ents.each do - |ent| + ents.each do |ent| # Now clone attachments by hard_linking to them in new message new_path = new_attach_path + ent old_path = old_attach_path + ent @@ -775,8 +767,7 @@ def get_message(params, state, ents = Dir.entries(basename + "/attach/").reject {|i| i.index('.') == 0 } if not ents.empty? msg['_attachments'] = { } - ents.each do - |ent| + ents.each do |ent| msg['_attachments'][ent] = { } path = "#{basename}/attach/#{ent}" md5, size = file_md5(path) @@ -951,8 +942,7 @@ def load_messages # run msgs just put back into que basename = @queue_path + '/run/' messages = Dir.entries(basename).reject {|i| i.index('.') == 0 } - messages.each do - |mname| + messages.each do |mname| begin File.rename(basename + mname, @queue_path + '/que/' + mname) rescue @@ -967,8 +957,7 @@ def load_messages messages.sort!.reverse! - messages.each do - |mname| + messages.each do |mname| begin data = File.read(basename + mname + "/msg") msg = JSON.parse(data) @@ -1018,8 +1007,7 @@ def handle_status_read(msg) child_msgs = data.split("\n") - child_msgs.each do - |child_msg| + child_msgs.each do |child_msg| parts = child_msg.split(" ", 2) # Always write message status @@ -1123,15 +1111,14 @@ def handle_status_read(msg) # The short msg_id que_msg_id = result[1][/\/q\/[^\/]+\/([^\/]+)/, 1] - attachments.each { - |path| + attachments.each do |path| r2 = qc.attach_message({'msg_id' => que_msg_id, 'pathname' => path}) if r2[0] != 'ok' log("#{@name}:#{Process.pid} couldn't DUP message - #{r2[1]}") msg['child_write_pipe'].syswrite("fail dup failed - attach fail #{r2[1]}\n") return end - } + end r3 = qc.commit_message({'msg_id' => que_msg_id}) if r3[0] != 'ok' @@ -1188,8 +1175,7 @@ def remove_msg_process_id(msg_id, state = 'run') end def log(mesg) - File.open(@queue_path + '/queue.log', "a") do - |f| + File.open(@queue_path + '/queue.log', "a") do |f| f.write("#{Process.pid} - #{Time.now} - #{mesg}\n") end end @@ -1210,8 +1196,7 @@ def run_scheduler! end # Are we arleady running max workers - active_count = @run.inject(0) do - |acc,o| + active_count = @run.inject(0) do |acc, o| if o.has_key?('child_pid') acc = acc + 1 end @@ -1258,9 +1243,6 @@ def run_scheduler! end def run_loop - - # Keep this here, cruft loves crufty company - require 'fcntl' flag = File::NONBLOCK if defined?(Fcntl::F_GETFL) flag |= @sock.fcntl(Fcntl::F_GETFL) @@ -1419,8 +1401,7 @@ def run_loop if ['err', 'done', 'relayed'].include? new_state # Send a webhook if there is a web hook if msg.include? 'post_run_webhook' - msg['post_run_webhook'].each do - |wh| + msg['post_run_webhook'].each do |wh| webhook_message(wh, msg, new_state) end end diff --git a/code/queuemgr.rb b/code/queuemgr.rb index d818a27..b8b1a7e 100644 --- a/code/queuemgr.rb +++ b/code/queuemgr.rb @@ -8,8 +8,7 @@ require 'version' def log(mesg) - File.open('log/queuemgr.log', "a") do - |f| + File.open('log/queuemgr.log', "a") do |f| f.write("#{Process.pid} - #{Time.now} - #{mesg}\n") end end @@ -140,7 +139,7 @@ def handle_request(sock) if data[0].index('create_queue ') == 0 json = data[0].split(' ', 2)[1] options = JSON.parse(json) - # "queue"=>{"name"=>"local", "script"=>"local.rb", "ordering"=>"ordered", "fsync"=>"fsync", "num_workers"=>"1", }} + # "queue"=>{"name"=>"local", "script"=>"local.rb", "ordering"=>"ordered", "fsync"=>"fsync", "num_workers"=>"1", }} if @queues.any? { |q| q.name == options['name'] } resp = ['fail', 'already created'].to_json @@ -313,7 +312,7 @@ def handle_request(sock) def shutdown final_shutdown! if @queues.empty? - + # Remove non-running entries @queues = @queues.select { |q| q.pid } @@ -377,7 +376,7 @@ def start_scheduler def load_queues queues = Dir.entries('queue').reject {|i| i.include? '.'} - + queues.each do |q| start_queue q end @@ -438,7 +437,7 @@ def run_loop rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINTR log('error acception on main sock, supposed to be readysleeping') end - # Linux Doesn't inherit and BSD does... recomended behavior is to set again + # Linux Doesn't inherit and BSD does... recomended behavior is to set again flag = 0xffffffff ^ File::NONBLOCK if defined?(Fcntl::F_GETFL) flag &= client_socket.fcntl(Fcntl::F_GETFL) @@ -448,14 +447,13 @@ def run_loop qmgr.handle_request(client_socket) else # probably a child pipe that closed - worker = qmgr.queues.find { - |i| + worker = qmgr.queues.find do |i| if i.child_write_pipe i.child_write_pipe.fileno == io.fileno else false end - } + end if worker res = Process.wait2(worker.pid, Process::WNOHANG) if res @@ -467,7 +465,7 @@ def run_loop # would really like a timer on the event loop so I can sleep a sec, but # whatever # - # If queue.rb code fails/exits + # If queue.rb code fails/exits if worker.num_restarts >= 11 worker.status = "ERROR" worker.pid = nil @@ -497,7 +495,7 @@ def run_loop else log("VERY STRANGE: got a read ready on an io that we don't track!") end - + end end diff --git a/code/rq.rb b/code/rq.rb index fc920b3..76be6cf 100644 --- a/code/rq.rb +++ b/code/rq.rb @@ -78,8 +78,7 @@ def cmd_sendmesg(args) # Construct message mesg = {} keys = %w(dest src count max_count param1 param2 param3 param4 due force_remote) - keys.each do - |key| + keys.each do |key| next unless args.has_key?(key) mesg[key] = args[key] end @@ -102,8 +101,7 @@ def cmd_prepmesg(args) # Construct message mesg = {} keys = %w(dest src count max_count param1 param2 param3 param4 due force_remote) - keys.each do - |key| + keys.each do |key| next unless args.has_key?(key) mesg[key] = args[key] end @@ -133,8 +131,7 @@ def cmd_attachmesg(args) # Construct message for queue mgr msg = {'msg_id' => msg_id} keys = %w(pathname name local_fs_only) - keys.each do - |key| + keys.each do |key| next unless args.has_key?(key) msg[key] = args[key] end @@ -239,8 +236,7 @@ def cmd_single_que(args) # Construct message mesg = {} keys = %w(dest src count max_count param1 param2 param3 param4 due force_remote) - keys.each do - |key| + keys.each do |key| next unless args.has_key?(key) mesg[key] = args[key] end @@ -264,14 +260,12 @@ def cmd_attachstatusmesg(args) if result[0] == 'ok' ents = [] if result[1].has_key?('_attachments') - result[1]['_attachments'].each do - |k,v| + result[1]['_attachments'].each do |k,v| ents << [k, v['md5'], v['size'], v['path']] end end print "#{result[0]} #{ents.length}\n" - ents.each do - |ent| + ents.each do |ent| print "#{ent[0]} #{ent[1]} #{ent[2]} #{ent[3]}\n" end else diff --git a/code/rule_processor.rb b/code/rule_processor.rb index f389337..f222598 100755 --- a/code/rule_processor.rb +++ b/code/rule_processor.rb @@ -176,16 +176,13 @@ def self.process_pathname(path, verbose=false) rules = [] begin lines = [] - File.open(path) do - |f| + File.open(path) do |f| lines = f.readlines end in_rule = false rule = nil - lines.each_with_index { - |line, i| - + lines.each_with_index do |line, i| i = i + 1 # i is offset by 0, so we bump it up for human readable line #s next if line[0..1] == "#" @@ -206,7 +203,7 @@ def self.process_pathname(path, verbose=false) rules << rule rule.process(line, i, verbose) end - } + end rescue Errno::ENOENT => ex return nil rescue StandardError => ex diff --git a/code/test/env_var_test.rb b/code/test/env_var_test.rb index d92e79c..42a6420 100755 --- a/code/test/env_var_test.rb +++ b/code/test/env_var_test.rb @@ -27,8 +27,7 @@ def test_queue_message # Wait for it to change state result = {} - 20.times do - |i| + 20.times do |i| sleep 0.2 uri_str = "#{msg_id}.json" res = Net::HTTP.get_response(URI.parse(uri_str)) diff --git a/code/test/test_script.rb b/code/test/test_script.rb index bf80cec..36e3fbf 100755 --- a/code/test/test_script.rb +++ b/code/test/test_script.rb @@ -1,10 +1,7 @@ #!/usr/bin/env ruby - -# def log(mesg) - File.open('relay.log', "a") do - |f| + File.open('relay.log', "a") do |f| f.write("#{Process.pid} - #{Time.now} - #{mesg}\n") end end diff --git a/code/unixrack.rb b/code/unixrack.rb index 8f33b88..3390603 100644 --- a/code/unixrack.rb +++ b/code/unixrack.rb @@ -7,6 +7,7 @@ require 'rack/handler' end require 'time' +require 'socket' require 'stringio' # Thx - Logan Capaldo @@ -257,8 +258,7 @@ def self.send_response!(sock, status, method, url, headers, body) headers['Connection'] ||= 'close' - headers.each do - |k, vs| + headers.each do |k, vs| vs.split("\n").each { |v| hdr_ary << ["#{k}: #{v}"] } end @@ -277,8 +277,6 @@ def self.send_response!(sock, status, method, url, headers, body) def self.run(app, options={}) - - require 'socket' port = options[:Port] || 8080 host = options[:Hostname] || 'localhost' listen = options[:Host] || '127.0.0.1'