Skip to content

Commit

Permalink
Style and whitespace
Browse files Browse the repository at this point in the history
  • Loading branch information
sodabrew committed Jul 30, 2013
1 parent 0328c0c commit 07a0164
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 105 deletions.
35 changes: 14 additions & 21 deletions code/hashdir.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,35 +28,31 @@ def self.entries_int(path, level, accum, limit = nil)
if level == 0
# YYYYMMDD
ents1 = Dir.glob("#{path}/[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]")
ents1.sort.reverse.each {
|e|
ents1.sort.reverse.each do |e|
self.entries_int(e, 1, accum, limit)
break if limit && accum.length == limit
}
end
elsif level == 1
# HH
ents1 = Dir.glob("#{path}/[0-9][0-9]")
ents1.sort.reverse.each {
|e|
ents1.sort.reverse.each do |e|
self.entries_int(e, 2, accum, limit)
break if limit && accum.length == limit
}
end
elsif level == 2
# MM
ents1 = Dir.glob("#{path}/[0-9][0-9]")
ents1.sort.reverse.each {
|e|
ents1.sort.reverse.each do |e|
self.entries_int(e, 3, accum, limit)
break if limit && accum.length == limit
}
end
elsif level == 3
# MESG-ID
ents1 = Dir.glob("#{path}/[0-9][0-9]*")
ents1.sort.reverse.each {
|e|
ents1.sort.reverse.each do |e|
accum << e.split('/').last
break if limit && accum.length == limit
}
end
end
accum
end
Expand All @@ -77,24 +73,21 @@ def self.num_entries_int(path, level)
if level == 0
# YYYYMMDD
ents1 = Dir.glob("#{path}/[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]")
ents1.sort.reverse.each {
|e|
ents1.sort.reverse.each do |e|
sum += self.num_entries_int(e, 1)
}
end
elsif level == 1
# HH
ents1 = Dir.glob("#{path}/[0-9][0-9]")
ents1.sort.reverse.each {
|e|
ents1.sort.reverse.each do |e|
sum += self.num_entries_int(e, 2)
}
end
elsif level == 2
# MM
ents1 = Dir.glob("#{path}/[0-9][0-9]")
ents1.sort.reverse.each {
|e|
ents1.sort.reverse.each do |e|
sum += self.num_entries_int(e, 3)
}
end
elsif level == 3
# MESG-ID
ents1 = Dir.glob("#{path}/[0-9][0-9]*")
Expand Down
4 changes: 1 addition & 3 deletions code/htmlutils.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ def process_chunk(text)

nums = codes.split(';')

nums.each do
|num_str|

nums.each do |num_str|
num = num_str.to_i

if num == 0
Expand Down
5 changes: 1 addition & 4 deletions code/message.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@

require 'socket'
require 'json'

module RQ
class Message < Struct.new(:msg_id, :status, :dest, :src, :param1, :param2, :param3, :param4)

def initialize(options = { }})
def initialize(options={})
end

def init_with_opts(options)
Expand All @@ -21,5 +20,3 @@ def init_with_opts(options)

end
end


57 changes: 19 additions & 38 deletions code/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,15 @@ def self.create(options,config_path=nil)
File.symlink(old_path, queue_path + '/config.json')
else
# Write config to dir
File.open(queue_path + '/config.json', "w") do
|f|
File.open(queue_path + '/config.json', "w") do |f|
f.write(options.to_json)
end
end
RQ::Queue.start_process(options)
end

def self.log(path, mesg)
File.open(path + '/queue.log', "a") do
|f|
File.open(path + '/queue.log', "a") do |f|
f.write("#{Process.pid} - #{Time.now} - #{mesg}\n")
end
end
Expand Down Expand Up @@ -356,8 +354,7 @@ def run_queue_script!(msg)
def init_socket
# Show pid
File.unlink(@queue_path + '/queue.pid') rescue nil
File.open(@queue_path + '/queue.pid', "w") do
|f|
File.open(@queue_path + '/queue.pid', "w") do |f|
f.write("#{Process.pid}\n")
end

Expand Down Expand Up @@ -447,8 +444,7 @@ def check_msg(msg, input)

# Copy only these keys from input message
keys = %w(src param1 param2 param3 param4 post_run_webhook due force_remote)
keys.each do
|key|
keys.each do |key|
next unless input.has_key?(key)
msg[key] = input[key]
end
Expand Down Expand Up @@ -521,8 +517,7 @@ def is_duplicate?(msg1, msg2)
# Put all of its dups into the done state
def handle_dups_done(msg, new_state)
if msg['dups']
msg['dups'].each {
|i|
msg['dups'].each do |i|
h = @temp_que_dups.delete(i)
new_status = "duplicate #{gen_full_msg_id(msg)}"
write_msg_status(i, new_status, 'que')
Expand All @@ -532,20 +527,19 @@ def handle_dups_done(msg, new_state)
# TODO: refactor this
basename = "#{@queue_path}/que/#{i}"
RQ::HashDir.inject(basename, "#{@queue_path}/#{new_state}", i)
}
end
msg['dups'] = msg['dups'].map { |i| gen_full_msg_id({'msg_id' => i}) }
end
end

# Handle a message that doesn't succeed
def handle_dups_fail(msg)
if msg['dups']
msg['dups'].each {
|i|
msg['dups'].each do |i|
h = @temp_que_dups.delete(i)
h.delete('dup')
@que.unshift(h)
}
end
msg.delete('dups')
end
end
Expand Down Expand Up @@ -584,8 +578,7 @@ def copy_and_clean_msg(input, new_dest = nil)

# Copy only these keys from input message
keys = %w(src param1 param2 param3 param4 post_run_webhook due)
keys.each do
|key|
keys.each do |key|
next unless input.has_key?(key)
msg[key] = input[key]
end
Expand Down Expand Up @@ -726,8 +719,7 @@ def clone_msg(msg)
new_attach_path = new_basename + '/attach/'
Dir.mkdir(new_attach_path)

ents.each do
|ent|
ents.each do |ent|
# Now clone attachments by hard_linking to them in new message
new_path = new_attach_path + ent
old_path = old_attach_path + ent
Expand Down Expand Up @@ -775,8 +767,7 @@ def get_message(params, state,
ents = Dir.entries(basename + "/attach/").reject {|i| i.index('.') == 0 }
if not ents.empty?
msg['_attachments'] = { }
ents.each do
|ent|
ents.each do |ent|
msg['_attachments'][ent] = { }
path = "#{basename}/attach/#{ent}"
md5, size = file_md5(path)
Expand Down Expand Up @@ -951,8 +942,7 @@ def load_messages
# run msgs just put back into que
basename = @queue_path + '/run/'
messages = Dir.entries(basename).reject {|i| i.index('.') == 0 }
messages.each do
|mname|
messages.each do |mname|
begin
File.rename(basename + mname, @queue_path + '/que/' + mname)
rescue
Expand All @@ -967,8 +957,7 @@ def load_messages

messages.sort!.reverse!

messages.each do
|mname|
messages.each do |mname|
begin
data = File.read(basename + mname + "/msg")
msg = JSON.parse(data)
Expand Down Expand Up @@ -1018,8 +1007,7 @@ def handle_status_read(msg)

child_msgs = data.split("\n")

child_msgs.each do
|child_msg|
child_msgs.each do |child_msg|
parts = child_msg.split(" ", 2)

# Always write message status
Expand Down Expand Up @@ -1123,15 +1111,14 @@ def handle_status_read(msg)
# The short msg_id
que_msg_id = result[1][/\/q\/[^\/]+\/([^\/]+)/, 1]

attachments.each {
|path|
attachments.each do |path|
r2 = qc.attach_message({'msg_id' => que_msg_id, 'pathname' => path})
if r2[0] != 'ok'
log("#{@name}:#{Process.pid} couldn't DUP message - #{r2[1]}")
msg['child_write_pipe'].syswrite("fail dup failed - attach fail #{r2[1]}\n")
return
end
}
end

r3 = qc.commit_message({'msg_id' => que_msg_id})
if r3[0] != 'ok'
Expand Down Expand Up @@ -1188,8 +1175,7 @@ def remove_msg_process_id(msg_id, state = 'run')
end

def log(mesg)
File.open(@queue_path + '/queue.log', "a") do
|f|
File.open(@queue_path + '/queue.log', "a") do |f|
f.write("#{Process.pid} - #{Time.now} - #{mesg}\n")
end
end
Expand All @@ -1210,8 +1196,7 @@ def run_scheduler!
end

# Are we arleady running max workers
active_count = @run.inject(0) do
|acc,o|
active_count = @run.inject(0) do |acc, o|
if o.has_key?('child_pid')
acc = acc + 1
end
Expand Down Expand Up @@ -1258,9 +1243,6 @@ def run_scheduler!
end

def run_loop

# Keep this here, cruft loves crufty company
require 'fcntl'
flag = File::NONBLOCK
if defined?(Fcntl::F_GETFL)
flag |= @sock.fcntl(Fcntl::F_GETFL)
Expand Down Expand Up @@ -1419,8 +1401,7 @@ def run_loop
if ['err', 'done', 'relayed'].include? new_state
# Send a webhook if there is a web hook
if msg.include? 'post_run_webhook'
msg['post_run_webhook'].each do
|wh|
msg['post_run_webhook'].each do |wh|
webhook_message(wh, msg, new_state)
end
end
Expand Down
20 changes: 9 additions & 11 deletions code/queuemgr.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
require 'version'

def log(mesg)
File.open('log/queuemgr.log', "a") do
|f|
File.open('log/queuemgr.log', "a") do |f|
f.write("#{Process.pid} - #{Time.now} - #{mesg}\n")
end
end
Expand Down Expand Up @@ -140,7 +139,7 @@ def handle_request(sock)
if data[0].index('create_queue ') == 0
json = data[0].split(' ', 2)[1]
options = JSON.parse(json)
# "queue"=>{"name"=>"local", "script"=>"local.rb", "ordering"=>"ordered", "fsync"=>"fsync", "num_workers"=>"1", }}
# "queue"=>{"name"=>"local", "script"=>"local.rb", "ordering"=>"ordered", "fsync"=>"fsync", "num_workers"=>"1", }}

if @queues.any? { |q| q.name == options['name'] }
resp = ['fail', 'already created'].to_json
Expand Down Expand Up @@ -313,7 +312,7 @@ def handle_request(sock)

def shutdown
final_shutdown! if @queues.empty?

# Remove non-running entries
@queues = @queues.select { |q| q.pid }

Expand Down Expand Up @@ -377,7 +376,7 @@ def start_scheduler

def load_queues
queues = Dir.entries('queue').reject {|i| i.include? '.'}

queues.each do |q|
start_queue q
end
Expand Down Expand Up @@ -438,7 +437,7 @@ def run_loop
rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::ECONNABORTED, Errno::EPROTO, Errno::EINTR
log('error acception on main sock, supposed to be readysleeping')
end
# Linux Doesn't inherit and BSD does... recomended behavior is to set again
# Linux Doesn't inherit and BSD does... recomended behavior is to set again
flag = 0xffffffff ^ File::NONBLOCK
if defined?(Fcntl::F_GETFL)
flag &= client_socket.fcntl(Fcntl::F_GETFL)
Expand All @@ -448,14 +447,13 @@ def run_loop
qmgr.handle_request(client_socket)
else
# probably a child pipe that closed
worker = qmgr.queues.find {
|i|
worker = qmgr.queues.find do |i|
if i.child_write_pipe
i.child_write_pipe.fileno == io.fileno
else
false
end
}
end
if worker
res = Process.wait2(worker.pid, Process::WNOHANG)
if res
Expand All @@ -467,7 +465,7 @@ def run_loop
# would really like a timer on the event loop so I can sleep a sec, but
# whatever
#
# If queue.rb code fails/exits
# If queue.rb code fails/exits
if worker.num_restarts >= 11
worker.status = "ERROR"
worker.pid = nil
Expand Down Expand Up @@ -497,7 +495,7 @@ def run_loop
else
log("VERY STRANGE: got a read ready on an io that we don't track!")
end

end
end

Expand Down
Loading

0 comments on commit 07a0164

Please sign in to comment.