forked from brightroll/rq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrq_router_script.rb
executable file
·141 lines (115 loc) · 3.25 KB
/
rq_router_script.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#!/usr/bin/env ruby
$:.unshift(File.join(File.dirname(__FILE__), ".."))
require 'vendor/environment'
require 'code/rule_processor'
require 'json'
module Alarm
case RUBY_VERSION.to_f
when 1.8
require 'dl/import'
extend DL::Importable
when 1.9
require 'dl/import'
extend DL::Importer
else # 2.0+
require 'fiddle/import'
extend Fiddle::Importer
end
if RUBY_PLATFORM =~ /darwin/
so_ext = 'dylib'
else
so_ext = 'so.6'
end
dlload "libc.#{so_ext}"
extern "unsigned int alarm(unsigned int)"
end
$rq_msg_dir = Dir.pwd
Dir.chdir("#{File.dirname(__FILE__)}")
# Setup a global binding so the GC doesn't close the file
$RQ_IO = IO.for_fd(ENV['RQ_WRITE'].to_i)
$RQ_RESULT_IO = IO.for_fd(ENV['RQ_READ'].to_i)
# IO tower to RQ mgr process
def write_status(state, mesg)
msg = "#{state} #{mesg}\n"
STDOUT.write("#{Process.pid} - #{Time.now} - #{msg}")
$RQ_IO.syswrite(msg)
end
def read_status()
data = $RQ_RESULT_IO.sysread(512)
data.split(' ', 2)
end
def handle_fail(mesg = 'soft fail')
count = ENV['RQ_COUNT'].to_i
if count > 15
write_status('run', "RQ_COUNT > 15 - failing")
write_status('fail', "RQ_COUNT > 15 - failing")
exit(0)
end
wait_seconds = count * count * 60
write_status('resend', "#{wait_seconds}-#{mesg}")
exit(0)
end
def main
path = "../config/rq_router_rules.rb"
rp = RQ::RuleProcessor.process_pathname(path)
if rp == nil
File.open("../config/rq_router.down", "w") { |f| f.write("bad rules file") }
write_status('err', "Bad rule file at: #{path}")
exit 1
end
trap("ALRM") { puts "#{$$}: program took too long (60 secs). Goodbye"; $stdout.flush; exit! 2 }
Alarm.alarm(60)
# Read current message
data = File.read("#{$rq_msg_dir}/../msg") # TODO: eliminate this cheat
curr_msg = JSON.parse(data)
rule = rp.first_match(curr_msg)
if rule.data[:log]
File.open("../log/rq_router.log", "a") do
|f|
f.write("#{Process.pid} - #{Time.now} - #{rule.data[:desc]} - #{rule.data[:action].to_s} - #{curr_msg['msg_id']} - #{curr_msg['src']}\n")
end
end
if rule.data[:delay] > 0
write_status('run', "router delay #{rule.data[:delay]} seconds")
sleep rule.data[:delay]
end
if rule.data[:action] == :done
write_status('done', "router processed a done")
exit 0
end
if rule.data[:action] == :err
write_status('err', "router processed an err")
exit 1
end
if rule.data[:action] == :balance
host = rule.select_hosts()[0]
new_dest = rp.txform_host(curr_msg['dest'], host)
write_status('dup', "0-X-#{new_dest}")
status, new_id = read_status()
if status == 'ok'
write_status('done', "DUP to #{new_id}")
exit 0
else
write_status('err', "DUP failed - #{new_id}")
exit 1
end
end
if rule.data[:action] == :relay
hosts = rule.select_hosts()
hosts.each {
|h|
new_dest = rp.txform_host(curr_msg['dest'], h)
write_status('dup', "0-X-#{new_dest}")
status, new_id = read_status()
if status == 'ok'
write_status('run', "DUP relay to #{new_id}")
else
write_status('err', "DUP relay failed - #{new_id}")
exit 1
end
}
write_status('done', "done relay of #{hosts.length} messages")
exit 0
end
end
main()