Skip to content

Commit

Permalink
Use File.join and case-when in a few more places some more
Browse files Browse the repository at this point in the history
  • Loading branch information
sodabrew committed Jan 23, 2015
1 parent 8eba974 commit 5f03641
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 57 deletions.
12 changes: 4 additions & 8 deletions code/cleaner_script.rb
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,10 @@ def trim_relay(qpath, num)
return
end

msgs.each do
|ent|

path = RQ::HashDir.path_for(qpath + "/relayed", ent)

# TODO: put progress
#puts "status: removing " + path
#STDOUT.flush
msgs.each do |ent|
path = RQ::HashDir.path_for(qpath, 'relayed', ent)
puts "status: removing " + path
STDOUT.flush
FileUtils.rm_rf(path)
end

Expand Down
8 changes: 4 additions & 4 deletions code/hashdir.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ def self.make(path)
return true
end

def self.exist(path, msg_id)
def self.exist(que_base_path, state, msg_id)
parts = self.msg_id_parts(msg_id)
# parts = [ "YYYYmmDD", "HH", "MM" ]

# If we got bad data, return false
return false unless parts

File.exists?("#{path}/#{parts[0]}/#{parts[1]}/#{parts[2]}/#{msg_id}")
File.exist?("#{que_base_path}/#{state}/#{parts[0]}/#{parts[1]}/#{parts[2]}/#{msg_id}")
end

# Do a DFT traverse in reverse order so most
Expand All @@ -35,9 +35,9 @@ def self.inject(prev_msg_path, new_base_path, msg_id)
File.rename(prev_msg_path, newname)
end

def self.path_for(que_base_path, msg_id)
def self.path_for(que_base_path, state, msg_id)
parts = self.msg_id_parts(msg_id)
"#{que_base_path}/#{parts[0]}/#{parts[1]}/#{parts[2]}/#{msg_id}"
"#{que_base_path}/#{state}/#{parts[0]}/#{parts[1]}/#{parts[2]}/#{msg_id}"
end

private
Expand Down
88 changes: 43 additions & 45 deletions code/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -480,13 +480,14 @@ def alloc_id(msg)
# Simple time insertion system - should work since single threaded
times = 0
z = Time.now.getutc
name = z.strftime("_%Y%m%d.%H%M.%S.") + sprintf("%03d", (z.tv_usec / 1000))
Dir.mkdir(@queue_path + "/prep/" + name)
stat = File.stat(@queue_path + "/prep/" + name)
new_name = z.strftime("%Y%m%d.%H%M.%S.") + sprintf("%03d.%d", (z.tv_usec / 1000), stat.ino)
File.rename(@queue_path + "/prep/" + name, @queue_path + "/prep/" + new_name)
name = z.strftime('_%Y%m%d.%H%M.%S.') + sprintf('%03d', (z.tv_usec / 1000))
prep_name = File.join(@queue_path, 'prep', name)
Dir.mkdir(prep_name)
stat = File.stat(prep_name)
new_name = z.strftime('%Y%m%d.%H%M.%S.') + sprintf('%03d.%d', (z.tv_usec / 1000), stat.ino)
File.rename(prep_name, File.join(@queue_path, 'prep', new_name))
@prep << new_name
msg["msg_id"] = new_name
msg['msg_id'] = new_name
rescue
times += 1
$log.warn("couldn't ALLOC ID times: #{times} [ #{$!} ]")
Expand Down Expand Up @@ -594,8 +595,8 @@ def handle_dups_done(msg, new_state)
h['state'] = new_state
store_msg(h, 'que')
# TODO: refactor this
basename = "#{@queue_path}/que/#{i}"
RQ::HashDir.inject(basename, "#{@queue_path}/#{new_state}", i)
basename = File.join(@queue_path, 'que', i)
RQ::HashDir.inject(basename, File.join(@queue_path, new_state), i)
end
msg['dups'] = msg['dups'].map { |i| gen_full_msg_id({'msg_id' => i}) }
end
Expand Down Expand Up @@ -682,11 +683,10 @@ def run_job(msg, from_state = 'que')

def msg_state_prep?(msg)
msg_id = msg['msg_id']
basename = @queue_path + "/prep/" + msg_id

return false unless @prep.include?(msg_id)

if not File.exists?(basename)
basename = File.join(@queue_path, 'prep', msg_id)
unless File.exists?(basename)
$log.warn("WARNING - serious queue inconsistency #{msg_id}")
$log.warn("WARNING - #{msg_id} in memory but not on disk")
return false
Expand All @@ -704,11 +704,11 @@ def msg_state(msg, options={:consistency => true})
'que'
elsif @run.find { |o| o['msg_id'] == msg_id }
'run'
elsif RQ::HashDir.exist(File.join(@queue_path, 'done'), msg_id)
basename = RQ::HashDir.path_for(File.join(@queue_path, 'done'), msg_id)
elsif RQ::HashDir.exist(@queue_path, 'done', msg_id)
basename = RQ::HashDir.path_for(@queue_path, 'done', msg_id)
'done'
elsif RQ::HashDir.exist(File.join(@queue_path, 'relayed'), msg_id)
basename = RQ::HashDir.path_for(File.join(@queue_path, 'relayed'), msg_id)
elsif RQ::HashDir.exist(@queue_path, 'relayed', msg_id)
basename = RQ::HashDir.path_for(@queue_path, 'relayed', msg_id)
'relayed'
elsif not Dir.glob(File.join(@queue_path, 'err', msg_id)).empty?
'err'
Expand All @@ -728,30 +728,28 @@ def msg_state(msg, options={:consistency => true})
state
end

def kill_msg!(msg)
state = msg_state(msg)
return nil unless state
end

def delete_msg!(msg)
state = msg_state(msg)
return nil unless state
return nil unless msg['msg_id']

basename = @queue_path + "/#{state}/" + msg['msg_id']
basename = File.join(@queue_path, state, msg['msg_id'])

if state == 'prep'
#FileUtils.remove_entry_secure(basename)
# N.B. do not delete a message in 'run' state,
# first kill the process and force the message to err state,
# then delete it from err state.
case state
when 'prep'
FileUtils.rm_rf(basename)
@prep.delete(msg['msg_id'])
end
if state == 'que'
#FileUtils.remove_entry_secure(basename)
when 'que'
FileUtils.rm_rf(basename)
@que.delete_if { |o| o['msg_id'] == msg['msg_id'] }
when 'done'
FileUtils.rm_rf(basename)
when 'err'
FileUtils.rm_rf(basename)
end
# TODO
# run
# done
end

def clone_msg(msg)
Expand All @@ -773,10 +771,10 @@ def clone_msg(msg)

# Now check for, and copy attachments
# Assumes that original message guaranteed attachment integrity
new_basename = @queue_path + "/prep/" + new_msg['msg_id']
new_basename = File.join(@queue_path, 'prep', new_msg['msg_id'])

if File.directory?(old_basename + "/attach/")
ents = Dir.entries(old_basename + "/attach/").reject {|i| i.start_with?('.') }
if File.directory?(old_basename + '/attach/')
ents = Dir.entries(old_basename + '/attach/').reject {|i| i.start_with?('.') }
if not ents.empty?
# simple check for attachment dir
old_attach_path = old_basename + '/attach/'
Expand Down Expand Up @@ -805,9 +803,9 @@ def get_message(params, state,
options={ :read_message => true,
:check_attachments => true})
if ['done', 'relayed'].include? state
basename = RQ::HashDir.path_for("#{@queue_path}/#{state}", params['msg_id'])
basename = RQ::HashDir.path_for(@queue_path, state, params['msg_id'])
else
basename = @queue_path + "/#{state}/" + params['msg_id']
basename = File.join(@queue_path, state, params['msg_id'])
end

msg = nil
Expand Down Expand Up @@ -835,11 +833,11 @@ def get_message(params, state,
msg['_attachments'] = { }
ents.each do |ent|
msg['_attachments'][ent] = { }
path = "#{basename}/attach/#{ent}"
path = File.join(basename, 'attach', ent)
md5, size = file_md5(path)
msg['_attachments'][ent]['md5'] = md5
msg['_attachments'][ent]['size'] = size
msg['_attachments'][ent]['path'] = cwd + '/' + path
msg['_attachments'][ent]['path'] = File.join(cwd, path)
end
end
end
Expand Down Expand Up @@ -878,7 +876,7 @@ def attach_msg(msg)
# validate attachment
result = [false, 'Unknown error']
begin
basename = @queue_path + "/prep/" + msg_id
basename = File.join(@queue_path, 'prep', msg_id)
return [false, "No message on disk"] unless File.exists? basename

#TODO: deal with symlinks
Expand Down Expand Up @@ -950,7 +948,7 @@ def del_attach_msg(msg)
# validate attachment
result = [false, 'Unknown error']
begin
basename = @queue_path + "/prep/" + msg_id
basename = File.join(@queue_path, 'prep', msg_id)
return [false, "No message on disk"] unless File.exists? basename

# simple check for attachment dir
Expand Down Expand Up @@ -1096,7 +1094,7 @@ def handle_status_read(msg)
msg_copy = copy_and_clean_msg(msg, new_dest)
msg_copy['due'] = new_due

basename = @queue_path + "/run/" + msg['msg_id']
basename = File.join(@queue_path, 'run', msg['msg_id'])

# Now see if there are any attachments
attachments = []
Expand Down Expand Up @@ -1159,7 +1157,7 @@ def write_msg_status(msg_id, mesg, state = 'run')
# Write message to disk
begin
data = { 'job_status' => mesg }.to_json
basename = @queue_path + "/#{state}/" + msg_id + "/status"
basename = File.join(@queue_path, state, msg_id, 'status')
File.open(basename + '.tmp', 'w') { |f| f.write(data) }
File.rename(basename + '.tmp', basename)
rescue
Expand All @@ -1171,15 +1169,15 @@ def write_msg_status(msg_id, mesg, state = 'run')
end

def read_msg_process_id(msg_id)
basename = "#{@queue_path}/run/#{msg_id}/pid"
basename = File.join(@queue_path, 'run', msg_id, 'pid')
File.read(basename).to_i
rescue
nil
end

# Write message pid to disk
def write_msg_process_id(msg_id, pid)
basename = "#{@queue_path}/run/#{msg_id}/pid"
basename = File.join(@queue_path, 'run', msg_id, 'pid')
File.open(basename + '.tmp', 'w') { |f| f.write(pid.to_s) }
File.rename(basename + '.tmp', basename)
true
Expand All @@ -1189,7 +1187,7 @@ def write_msg_process_id(msg_id, pid)
end

def remove_msg_process_id(msg_id, state = 'run')
basename = "#{@queue_path}/run/#{msg_id}/pid"
basename = File.join(@queue_path, 'run', msg_id, 'pid')
FileUtils.rm_rf(basename)
end

Expand Down Expand Up @@ -1434,13 +1432,13 @@ def handle_child_close(msg, exit_status=nil)
# store message since it made it to done and we want the 'dups' field to live
handle_dups_done(msg, new_state)
store_msg(msg, 'run')
RQ::HashDir.inject(basename, "#{@queue_path}/#{new_state}", msg_id)
RQ::HashDir.inject(basename, File.join(@queue_path, new_state), msg_id)
else
# store message since it failed to make it to done and
# we want the 'dups' field to be removed
handle_dups_fail(msg)
store_msg(msg, 'run')
newname = "#{@queue_path}/#{new_state}/#{msg_id}"
newname = File.join(@queue_path, new_state, msg_id)
File.rename(basename, newname)
end
rescue
Expand Down

0 comments on commit 5f03641

Please sign in to comment.