From 5f0364115351cc4d5dc4ad1cd3f24d085c9631df Mon Sep 17 00:00:00 2001 From: Aaron Stone Date: Thu, 22 Jan 2015 16:08:14 -0800 Subject: [PATCH] Use File.join and case-when in a few more places some more --- code/cleaner_script.rb | 12 ++---- code/hashdir.rb | 8 ++-- code/queue.rb | 88 +++++++++++++++++++++--------------------- 3 files changed, 51 insertions(+), 57 deletions(-) diff --git a/code/cleaner_script.rb b/code/cleaner_script.rb index f9ab608..d9662f0 100755 --- a/code/cleaner_script.rb +++ b/code/cleaner_script.rb @@ -120,14 +120,10 @@ def trim_relay(qpath, num) return end - msgs.each do - |ent| - - path = RQ::HashDir.path_for(qpath + "/relayed", ent) - - # TODO: put progress - #puts "status: removing " + path - #STDOUT.flush + msgs.each do |ent| + path = RQ::HashDir.path_for(qpath, 'relayed', ent) + puts "status: removing " + path + STDOUT.flush FileUtils.rm_rf(path) end diff --git a/code/hashdir.rb b/code/hashdir.rb index 6795d91..1b614a7 100644 --- a/code/hashdir.rb +++ b/code/hashdir.rb @@ -8,14 +8,14 @@ def self.make(path) return true end - def self.exist(path, msg_id) + def self.exist(que_base_path, state, msg_id) parts = self.msg_id_parts(msg_id) # parts = [ "YYYYmmDD", "HH", "MM" ] # If we got bad data, return false return false unless parts - File.exists?("#{path}/#{parts[0]}/#{parts[1]}/#{parts[2]}/#{msg_id}") + File.exist?("#{que_base_path}/#{state}/#{parts[0]}/#{parts[1]}/#{parts[2]}/#{msg_id}") end # Do a DFT traverse in reverse order so most @@ -35,9 +35,9 @@ def self.inject(prev_msg_path, new_base_path, msg_id) File.rename(prev_msg_path, newname) end - def self.path_for(que_base_path, msg_id) + def self.path_for(que_base_path, state, msg_id) parts = self.msg_id_parts(msg_id) - "#{que_base_path}/#{parts[0]}/#{parts[1]}/#{parts[2]}/#{msg_id}" + "#{que_base_path}/#{state}/#{parts[0]}/#{parts[1]}/#{parts[2]}/#{msg_id}" end private diff --git a/code/queue.rb b/code/queue.rb index 9ed3aca..4f62808 100644 --- a/code/queue.rb +++ b/code/queue.rb @@ -480,13 +480,14 @@ def alloc_id(msg) # Simple time insertion system - should work since single threaded times = 0 z = Time.now.getutc - name = z.strftime("_%Y%m%d.%H%M.%S.") + sprintf("%03d", (z.tv_usec / 1000)) - Dir.mkdir(@queue_path + "/prep/" + name) - stat = File.stat(@queue_path + "/prep/" + name) - new_name = z.strftime("%Y%m%d.%H%M.%S.") + sprintf("%03d.%d", (z.tv_usec / 1000), stat.ino) - File.rename(@queue_path + "/prep/" + name, @queue_path + "/prep/" + new_name) + name = z.strftime('_%Y%m%d.%H%M.%S.') + sprintf('%03d', (z.tv_usec / 1000)) + prep_name = File.join(@queue_path, 'prep', name) + Dir.mkdir(prep_name) + stat = File.stat(prep_name) + new_name = z.strftime('%Y%m%d.%H%M.%S.') + sprintf('%03d.%d', (z.tv_usec / 1000), stat.ino) + File.rename(prep_name, File.join(@queue_path, 'prep', new_name)) @prep << new_name - msg["msg_id"] = new_name + msg['msg_id'] = new_name rescue times += 1 $log.warn("couldn't ALLOC ID times: #{times} [ #{$!} ]") @@ -594,8 +595,8 @@ def handle_dups_done(msg, new_state) h['state'] = new_state store_msg(h, 'que') # TODO: refactor this - basename = "#{@queue_path}/que/#{i}" - RQ::HashDir.inject(basename, "#{@queue_path}/#{new_state}", i) + basename = File.join(@queue_path, 'que', i) + RQ::HashDir.inject(basename, File.join(@queue_path, new_state), i) end msg['dups'] = msg['dups'].map { |i| gen_full_msg_id({'msg_id' => i}) } end @@ -682,11 +683,10 @@ def run_job(msg, from_state = 'que') def msg_state_prep?(msg) msg_id = msg['msg_id'] - basename = @queue_path + "/prep/" + msg_id - return false unless @prep.include?(msg_id) - if not File.exists?(basename) + basename = File.join(@queue_path, 'prep', msg_id) + unless File.exists?(basename) $log.warn("WARNING - serious queue inconsistency #{msg_id}") $log.warn("WARNING - #{msg_id} in memory but not on disk") return false @@ -704,11 +704,11 @@ def msg_state(msg, options={:consistency => true}) 'que' elsif @run.find { |o| o['msg_id'] == msg_id } 'run' - elsif RQ::HashDir.exist(File.join(@queue_path, 'done'), msg_id) - basename = RQ::HashDir.path_for(File.join(@queue_path, 'done'), msg_id) + elsif RQ::HashDir.exist(@queue_path, 'done', msg_id) + basename = RQ::HashDir.path_for(@queue_path, 'done', msg_id) 'done' - elsif RQ::HashDir.exist(File.join(@queue_path, 'relayed'), msg_id) - basename = RQ::HashDir.path_for(File.join(@queue_path, 'relayed'), msg_id) + elsif RQ::HashDir.exist(@queue_path, 'relayed', msg_id) + basename = RQ::HashDir.path_for(@queue_path, 'relayed', msg_id) 'relayed' elsif not Dir.glob(File.join(@queue_path, 'err', msg_id)).empty? 'err' @@ -728,30 +728,28 @@ def msg_state(msg, options={:consistency => true}) state end - def kill_msg!(msg) - state = msg_state(msg) - return nil unless state - end - def delete_msg!(msg) state = msg_state(msg) return nil unless state + return nil unless msg['msg_id'] - basename = @queue_path + "/#{state}/" + msg['msg_id'] + basename = File.join(@queue_path, state, msg['msg_id']) - if state == 'prep' - #FileUtils.remove_entry_secure(basename) + # N.B. do not delete a message in 'run' state, + # first kill the process and force the message to err state, + # then delete it from err state. + case state + when 'prep' FileUtils.rm_rf(basename) @prep.delete(msg['msg_id']) - end - if state == 'que' - #FileUtils.remove_entry_secure(basename) + when 'que' FileUtils.rm_rf(basename) @que.delete_if { |o| o['msg_id'] == msg['msg_id'] } + when 'done' + FileUtils.rm_rf(basename) + when 'err' + FileUtils.rm_rf(basename) end - # TODO - # run - # done end def clone_msg(msg) @@ -773,10 +771,10 @@ def clone_msg(msg) # Now check for, and copy attachments # Assumes that original message guaranteed attachment integrity - new_basename = @queue_path + "/prep/" + new_msg['msg_id'] + new_basename = File.join(@queue_path, 'prep', new_msg['msg_id']) - if File.directory?(old_basename + "/attach/") - ents = Dir.entries(old_basename + "/attach/").reject {|i| i.start_with?('.') } + if File.directory?(old_basename + '/attach/') + ents = Dir.entries(old_basename + '/attach/').reject {|i| i.start_with?('.') } if not ents.empty? # simple check for attachment dir old_attach_path = old_basename + '/attach/' @@ -805,9 +803,9 @@ def get_message(params, state, options={ :read_message => true, :check_attachments => true}) if ['done', 'relayed'].include? state - basename = RQ::HashDir.path_for("#{@queue_path}/#{state}", params['msg_id']) + basename = RQ::HashDir.path_for(@queue_path, state, params['msg_id']) else - basename = @queue_path + "/#{state}/" + params['msg_id'] + basename = File.join(@queue_path, state, params['msg_id']) end msg = nil @@ -835,11 +833,11 @@ def get_message(params, state, msg['_attachments'] = { } ents.each do |ent| msg['_attachments'][ent] = { } - path = "#{basename}/attach/#{ent}" + path = File.join(basename, 'attach', ent) md5, size = file_md5(path) msg['_attachments'][ent]['md5'] = md5 msg['_attachments'][ent]['size'] = size - msg['_attachments'][ent]['path'] = cwd + '/' + path + msg['_attachments'][ent]['path'] = File.join(cwd, path) end end end @@ -878,7 +876,7 @@ def attach_msg(msg) # validate attachment result = [false, 'Unknown error'] begin - basename = @queue_path + "/prep/" + msg_id + basename = File.join(@queue_path, 'prep', msg_id) return [false, "No message on disk"] unless File.exists? basename #TODO: deal with symlinks @@ -950,7 +948,7 @@ def del_attach_msg(msg) # validate attachment result = [false, 'Unknown error'] begin - basename = @queue_path + "/prep/" + msg_id + basename = File.join(@queue_path, 'prep', msg_id) return [false, "No message on disk"] unless File.exists? basename # simple check for attachment dir @@ -1096,7 +1094,7 @@ def handle_status_read(msg) msg_copy = copy_and_clean_msg(msg, new_dest) msg_copy['due'] = new_due - basename = @queue_path + "/run/" + msg['msg_id'] + basename = File.join(@queue_path, 'run', msg['msg_id']) # Now see if there are any attachments attachments = [] @@ -1159,7 +1157,7 @@ def write_msg_status(msg_id, mesg, state = 'run') # Write message to disk begin data = { 'job_status' => mesg }.to_json - basename = @queue_path + "/#{state}/" + msg_id + "/status" + basename = File.join(@queue_path, state, msg_id, 'status') File.open(basename + '.tmp', 'w') { |f| f.write(data) } File.rename(basename + '.tmp', basename) rescue @@ -1171,7 +1169,7 @@ def write_msg_status(msg_id, mesg, state = 'run') end def read_msg_process_id(msg_id) - basename = "#{@queue_path}/run/#{msg_id}/pid" + basename = File.join(@queue_path, 'run', msg_id, 'pid') File.read(basename).to_i rescue nil @@ -1179,7 +1177,7 @@ def read_msg_process_id(msg_id) # Write message pid to disk def write_msg_process_id(msg_id, pid) - basename = "#{@queue_path}/run/#{msg_id}/pid" + basename = File.join(@queue_path, 'run', msg_id, 'pid') File.open(basename + '.tmp', 'w') { |f| f.write(pid.to_s) } File.rename(basename + '.tmp', basename) true @@ -1189,7 +1187,7 @@ def write_msg_process_id(msg_id, pid) end def remove_msg_process_id(msg_id, state = 'run') - basename = "#{@queue_path}/run/#{msg_id}/pid" + basename = File.join(@queue_path, 'run', msg_id, 'pid') FileUtils.rm_rf(basename) end @@ -1434,13 +1432,13 @@ def handle_child_close(msg, exit_status=nil) # store message since it made it to done and we want the 'dups' field to live handle_dups_done(msg, new_state) store_msg(msg, 'run') - RQ::HashDir.inject(basename, "#{@queue_path}/#{new_state}", msg_id) + RQ::HashDir.inject(basename, File.join(@queue_path, new_state), msg_id) else # store message since it failed to make it to done and # we want the 'dups' field to be removed handle_dups_fail(msg) store_msg(msg, 'run') - newname = "#{@queue_path}/#{new_state}/#{msg_id}" + newname = File.join(@queue_path, new_state, msg_id) File.rename(basename, newname) end rescue